From 28c51ba4368ffdef8c6e4aa926029bcbcdfbb603 Mon Sep 17 00:00:00 2001 From: steebchen Date: Thu, 27 Jun 2024 16:36:21 +0100 Subject: [PATCH 1/6] fix(go-sdk): prefix action names with workflow name --- pkg/client/types/action.go | 29 +++++++++++++---------------- pkg/worker/service.go | 4 ++-- pkg/worker/webhook_handler.go | 8 ++++++-- pkg/worker/worker.go | 12 ++++++++---- pkg/worker/workflow.go | 18 +++++++++++------- 5 files changed, 40 insertions(+), 31 deletions(-) diff --git a/pkg/client/types/action.go b/pkg/client/types/action.go index d96d9b6121..d7f101543c 100644 --- a/pkg/client/types/action.go +++ b/pkg/client/types/action.go @@ -14,19 +14,15 @@ type Action struct { // Required. The verb to perform. Verb string - // Optional. A way to unique identify the step. - Subresource string + // The workflow name. Optional for compatibility reasons. + Workflow string } func (o Action) String() string { - if o.Subresource != "" { - return fmt.Sprintf("%s:%s:%s", o.Service, o.Verb, o.Subresource) + if o.Workflow != "" { + return fmt.Sprintf("%s:%s:%s", o.Workflow, o.Service, o.Verb) } - return o.IntegrationVerbString() -} - -func (o Action) IntegrationVerbString() string { return fmt.Sprintf("%s:%s", o.Service, o.Verb) } @@ -39,18 +35,19 @@ func ParseActionID(actionID string) (Action, error) { return Action{}, fmt.Errorf("invalid action id %s, must have at least 2 strings separated : (colon)", actionID) } - Service := firstToLower(parts[0]) - verb := strings.ToLower(parts[1]) - - var subresource string + var workflow string if numParts == 3 { - subresource = firstToLower(parts[2]) + workflow = firstToLower(parts[0]) + parts = parts[1:] } + service := firstToLower(parts[0]) + verb := strings.ToLower(parts[1]) + return Action{ - Service: Service, - Verb: verb, - Subresource: subresource, + Service: service, + Verb: verb, + Workflow: workflow, }, nil } diff --git a/pkg/worker/service.go b/pkg/worker/service.go index 5bf71c5f5a..7346d004bc 100644 --- a/pkg/worker/service.go +++ b/pkg/worker/service.go @@ -56,7 +56,7 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error { } } - err = s.worker.registerAction(parsedAction.Service, parsedAction.Verb, fn) + err = s.worker.registerAction(apiWorkflow.Name, parsedAction.Service, parsedAction.Verb, fn) if err != nil { return err @@ -89,7 +89,7 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error { fnOpts.name = getFnName(fn) } - return s.worker.registerAction(s.Name, fnOpts.name, fn) + return s.worker.registerAction("none", s.Name, fnOpts.name, fn) } func (s *Service) Call(verb string) *WorkflowStep { diff --git a/pkg/worker/webhook_handler.go b/pkg/worker/webhook_handler.go index dc2e135f91..650bbce5d3 100644 --- a/pkg/worker/webhook_handler.go +++ b/pkg/worker/webhook_handler.go @@ -154,8 +154,12 @@ func (w *Worker) WebhookHttpHandler(opts WebhookHandlerOptions, workflows ...wor func (w *Worker) webhookProcess(ctx HatchetContext) (interface{}, error) { var do Action for _, action := range w.actions { - split := strings.Split(action.Name(), ":") // service:action - if split[1] == ctx.StepName() { + split := strings.Split(action.Name(), ":") // service:action or workflow:service:action + if len(split) == 3 && split[3-1] == ctx.StepName() { + do = action + break + } + if len(split) == 2 && split[2-1] == ctx.StepName() { // compatibility with old actions do = action break } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6361d5f09a..dbc5183405 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "reflect" + "strings" "sync" "time" @@ -197,7 +198,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) { for _, integrationAction := range actions { action := fmt.Sprintf("%s:%s", integrationId, integrationAction) - err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction)) + err := w.registerAction("integration", integrationId, action, integration.ActionHandler(integrationAction)) if err != nil { return nil, fmt.Errorf("could not register integration action %s: %w", action, err) @@ -266,11 +267,14 @@ func (w *Worker) RegisterAction(actionId string, method any) error { return fmt.Errorf("could not parse action id: %w", err) } - return w.registerAction(action.Service, action.Verb, method) + return w.registerAction("none", action.Service, action.Verb, method) } -func (w *Worker) registerAction(service, verb string, method any) error { - actionId := fmt.Sprintf("%s:%s", service, verb) +func (w *Worker) registerAction(wf, service, verb string, method any) error { + wf = strings.ToLower(wf) // TODO + wf = strings.ReplaceAll(wf, " ", "-") // TODO + + actionId := fmt.Sprintf("%s:%s:%s", wf, service, verb) // if the service is "concurrency", then this is a special action if service == "concurrency" { diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index 1fe03c082b..f308214b4d 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -225,7 +225,7 @@ func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.Wo for i := range j.Steps { - newStep, err := j.Steps[i].ToWorkflowStep(svcName, i, namespace) + newStep, err := j.Steps[i].ToWorkflowStep(j.Name, svcName, i, namespace) if err != nil { return nil, err @@ -245,7 +245,7 @@ func (j *WorkflowJob) ToActionMap(svcName string) map[string]any { res := map[string]any{} for i, step := range j.Steps { - actionId := step.GetActionId(svcName, i) + actionId := step.GetActionId(j.Name, svcName, i) res[actionId] = step.Function } @@ -348,7 +348,7 @@ func (w *WorkflowStep) ToActionMap(svcName string) map[string]any { step := *w return map[string]any{ - step.GetActionId(svcName, 0): w.Function, + step.GetActionId(w.Name, svcName, 0): w.Function, } } @@ -364,7 +364,7 @@ type Step struct { APIStep types.WorkflowStep } -func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error) { +func (w *WorkflowStep) ToWorkflowStep(wfName, svcName string, index int, namespace string) (*Step, error) { fnType := reflect.TypeOf(w.Function) res := &Step{} @@ -375,7 +375,7 @@ func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace strin Name: res.Id, ID: w.GetStepId(index), Timeout: w.Timeout, - ActionID: w.GetActionId(svcName, index), + ActionID: w.GetActionId(wfName, svcName, index), Parents: []string{}, Retries: w.Retries, } @@ -443,10 +443,14 @@ func (w *WorkflowStep) GetStepId(index int) string { return stepId } -func (w *WorkflowStep) GetActionId(svcName string, index int) string { +func (w *WorkflowStep) GetActionId(wfName, svcName string, index int) string { stepId := w.GetStepId(index) - return fmt.Sprintf("%s:%s", svcName, stepId) + wf := wfName + wf = strings.ToLower(wf) + wf = strings.ReplaceAll(wf, " ", "-") + + return fmt.Sprintf("%s:%s:%s", wf, svcName, stepId) } func getFnName(fn any) string { From ed2d6a4fd8f490740be17a31660ef7aed12a4943 Mon Sep 17 00:00:00 2001 From: steebchen Date: Thu, 27 Jun 2024 21:27:33 +0100 Subject: [PATCH 2/6] adapt cancellation action ids --- pkg/worker/workflow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index f308214b4d..d2355ab638 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -202,7 +202,7 @@ func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflo if j.Concurrency != nil { w.Concurrency = &types.WorkflowConcurrency{ - ActionID: "concurrency:" + getFnName(j.Concurrency.fn), // TODO this should also be namespaced + ActionID: w.Name + ":concurrency:" + getFnName(j.Concurrency.fn), } if j.Concurrency.maxRuns != nil { @@ -251,7 +251,7 @@ func (j *WorkflowJob) ToActionMap(svcName string) map[string]any { } if j.Concurrency != nil { - res["concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn + res[j.Name+":concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn } if j.OnFailure != nil { From b5d53f086ce03e814e6aef64e073d504ef46d407 Mon Sep 17 00:00:00 2001 From: steebchen Date: Thu, 27 Jun 2024 21:29:57 +0100 Subject: [PATCH 3/6] adapt webhook test key --- examples/webhook/main_e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/webhook/main_e2e_test.go b/examples/webhook/main_e2e_test.go index 04acafe2f2..3dd6612c6f 100644 --- a/examples/webhook/main_e2e_test.go +++ b/examples/webhook/main_e2e_test.go @@ -156,7 +156,7 @@ func TestWebhook(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPut { w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(fmt.Sprintf(`{"actions": ["default:%s"]}`, "webhook-failure-step-one"))) + _, _ = w.Write([]byte(fmt.Sprintf(`{"actions": ["%s:default:%s"]}`, workflow, "webhook-failure-step-one"))) return } w.WriteHeader(http.StatusInternalServerError) // simulate a failure From e8c54ee51ef31abdf1fca31bac65057a799586c5 Mon Sep 17 00:00:00 2001 From: steebchen Date: Thu, 27 Jun 2024 21:43:45 +0100 Subject: [PATCH 4/6] use none in svc call --- pkg/worker/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/worker/service.go b/pkg/worker/service.go index 7346d004bc..ed2a035d39 100644 --- a/pkg/worker/service.go +++ b/pkg/worker/service.go @@ -93,7 +93,7 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error { } func (s *Service) Call(verb string) *WorkflowStep { - actionId := fmt.Sprintf("%s:%s", s.Name, verb) + actionId := fmt.Sprintf("none:%s:%s", s.Name, verb) registeredAction, exists := s.worker.actions[actionId] From 79204e04415c2ce84e0cddd5ca05c2e999e0caf4 Mon Sep 17 00:00:00 2001 From: steebchen Date: Thu, 27 Jun 2024 21:47:56 +0100 Subject: [PATCH 5/6] use empty string for non-workflow actions --- pkg/worker/service.go | 4 ++-- pkg/worker/worker.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/worker/service.go b/pkg/worker/service.go index ed2a035d39..38076993e1 100644 --- a/pkg/worker/service.go +++ b/pkg/worker/service.go @@ -89,11 +89,11 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error { fnOpts.name = getFnName(fn) } - return s.worker.registerAction("none", s.Name, fnOpts.name, fn) + return s.worker.registerAction("", s.Name, fnOpts.name, fn) } func (s *Service) Call(verb string) *WorkflowStep { - actionId := fmt.Sprintf("none:%s:%s", s.Name, verb) + actionId := fmt.Sprintf("%s:%s", s.Name, verb) registeredAction, exists := s.worker.actions[actionId] diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index dbc5183405..c66397e6ee 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -267,14 +267,19 @@ func (w *Worker) RegisterAction(actionId string, method any) error { return fmt.Errorf("could not parse action id: %w", err) } - return w.registerAction("none", action.Service, action.Verb, method) + return w.registerAction("", action.Service, action.Verb, method) } func (w *Worker) registerAction(wf, service, verb string, method any) error { wf = strings.ToLower(wf) // TODO wf = strings.ReplaceAll(wf, " ", "-") // TODO - actionId := fmt.Sprintf("%s:%s:%s", wf, service, verb) + var actionId string + if wf != "" { + actionId = fmt.Sprintf("%s:%s:%s", wf, service, verb) + } else { + actionId = fmt.Sprintf("%s:%s", service, verb) + } // if the service is "concurrency", then this is a special action if service == "concurrency" { From f060f5bf8401bfbba035e8b5a2cbcb2705c524ea Mon Sep 17 00:00:00 2001 From: steebchen Date: Thu, 27 Jun 2024 22:17:30 +0100 Subject: [PATCH 6/6] refactor workflow name conversion --- pkg/worker/worker.go | 4 +--- pkg/worker/workflow.go | 8 +++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index c66397e6ee..7e1348b1bb 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "reflect" - "strings" "sync" "time" @@ -271,8 +270,7 @@ func (w *Worker) RegisterAction(actionId string, method any) error { } func (w *Worker) registerAction(wf, service, verb string, method any) error { - wf = strings.ToLower(wf) // TODO - wf = strings.ReplaceAll(wf, " ", "-") // TODO + wf = convertWorkflowNameToAction(wf) var actionId string if wf != "" { diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index d2355ab638..d7473d3456 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -446,9 +446,7 @@ func (w *WorkflowStep) GetStepId(index int) string { func (w *WorkflowStep) GetActionId(wfName, svcName string, index int) string { stepId := w.GetStepId(index) - wf := wfName - wf = strings.ToLower(wf) - wf = strings.ReplaceAll(wf, " ", "-") + wf := convertWorkflowNameToAction(wfName) return fmt.Sprintf("%s:%s:%s", wf, svcName, stepId) } @@ -469,3 +467,7 @@ func getFnName(fn any) string { return strings.ReplaceAll(fnName, ".", "-") } + +func convertWorkflowNameToAction(workflowName string) string { + return strings.ReplaceAll(strings.ToLower(workflowName), " ", "-") +}