From c31a3e3356bcd84dfda0d82540132c194bf4fc36 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 28 Mar 2022 18:19:56 -0700 Subject: [PATCH 1/4] Improve deletenamespace workflow logging --- common/metrics/defs.go | 4 ++ .../deleteexecutions/activities.go | 9 +++- .../reclaimresources/activities.go | 47 +++++++++++-------- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index b76adae8a00..7da346760d7 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2214,7 +2214,9 @@ const ( ReadNamespaceFailuresCount ListExecutionsFailuresCount TerminateExecutionFailuresCount + TerminateExecutionNotFoundCount DeleteExecutionFailuresCount + DeleteExecutionNotFoundCount RateLimiterFailuresCount NumWorkerMetrics @@ -2683,7 +2685,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ReadNamespaceFailuresCount: NewCounterDef("read_namespace_failures"), ListExecutionsFailuresCount: NewCounterDef("list_executions_failures"), TerminateExecutionFailuresCount: NewCounterDef("terminate_executions_failures"), + TerminateExecutionNotFoundCount: NewCounterDef("terminate_executions_not_found"), DeleteExecutionFailuresCount: NewCounterDef("delete_execution_failures"), + DeleteExecutionNotFoundCount: NewCounterDef("delete_execution_not_found"), RateLimiterFailuresCount: NewCounterDef("rate_limiter_failures"), }, Server: { diff --git a/service/worker/deletenamespace/deleteexecutions/activities.go b/service/worker/deletenamespace/deleteexecutions/activities.go index 44b74ff7188..550093b9eb6 100644 --- a/service/worker/deletenamespace/deleteexecutions/activities.go +++ b/service/worker/deletenamespace/deleteexecutions/activities.go @@ -123,7 +123,7 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete err = rateLimiter.Wait(ctx) if err != nil { a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.RateLimiterFailuresCount) - a.logger.Error("Workflow execution deletion rate limiter error.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err)) + a.logger.Error("Workflow executions delete rate limiter error.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err)) return result, err } if execution.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { @@ -136,7 +136,10 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete }, }) switch err.(type) { - case nil, *serviceerror.NotFound: // Workflow execution has already completed or doesn't exist. + case nil: + case *serviceerror.NotFound: // Workflow execution has already completed or doesn't exist. + a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.TerminateExecutionNotFoundCount) + a.logger.Warn("Workflow execution is not found or not running.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId())) default: a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.TerminateExecutionFailuresCount) a.logger.Error("Unable to terminate workflow execution.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()), tag.Error(err)) @@ -153,6 +156,8 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete result.SuccessCount++ a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionsSuccessCount) case *serviceerror.NotFound: // Workflow execution doesn't exist. Do nothing. + a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionNotFoundCount) + a.logger.Warn("Workflow execution is not found.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId())) default: result.ErrorCount++ a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionFailuresCount) diff --git a/service/worker/deletenamespace/reclaimresources/activities.go b/service/worker/deletenamespace/reclaimresources/activities.go index 4c34848d721..dfc11715309 100644 --- a/service/worker/deletenamespace/reclaimresources/activities.go +++ b/service/worker/deletenamespace/reclaimresources/activities.go @@ -27,6 +27,8 @@ package reclaimresources import ( "context" + "go.temporal.io/sdk/activity" + "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -61,6 +63,7 @@ func NewActivities( func (a *Activities) EnsureNoExecutionsActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error { // TODO: remove this check after CountWorkflowExecutions is implemented in standard visibility. + count := int64(0) if a.visibilityManager.GetName() == "elasticsearch" { req := &manager.CountWorkflowExecutionsRequest{ NamespaceID: nsID, @@ -69,30 +72,36 @@ func (a *Activities) EnsureNoExecutionsActivity(ctx context.Context, nsID namesp resp, err := a.visibilityManager.CountWorkflowExecutions(ctx, req) if err != nil { a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount) - a.logger.Error("Unable to count workflows.", tag.WorkflowNamespace(nsName.String()), tag.Error(err)) + a.logger.Error("Unable to count workflow executions.", tag.WorkflowNamespace(nsName.String()), tag.Error(err)) return err } - if resp.Count > 0 { - return errors.ErrExecutionsStillExist + count = resp.Count + } else { + req := &manager.ListWorkflowExecutionsRequestV2{ + NamespaceID: nsID, + Namespace: nsName, + PageSize: 1, } - return nil - } - - req := &manager.ListWorkflowExecutionsRequestV2{ - NamespaceID: nsID, - Namespace: nsName, - PageSize: 1, - } - resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req) - if err != nil { - a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount) - a.logger.Error("Unable to count workflows using list.", tag.WorkflowNamespace(nsName.String()), tag.Error(err)) - return err + resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req) + if err != nil { + a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount) + a.logger.Error("Unable to count workflow executions using list.", tag.WorkflowNamespace(nsName.String()), tag.Error(err)) + return err + } + // If not 0, it will always be 1 due to PageSize set to 1. + count = int64(len(resp.Executions)) } - if len(resp.Executions) > 0 { - return errors.ErrExecutionsStillExist + if count > 0 { + a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(int(count))) + activity.RecordHeartbeat(ctx, count) + select { + case <-ctx.Done(): + return ctx.Err() + default: + return errors.ErrExecutionsStillExist + } } return nil } @@ -105,7 +114,7 @@ func (a *Activities) DeleteNamespaceActivity(ctx context.Context, nsID namespace err := a.metadataManager.DeleteNamespaceByName(ctx, deleteNamespaceRequest) if err != nil { a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.DeleteNamespaceFailuresCount) - a.logger.Error("Unable delete namespace from persistence.", tag.WorkflowNamespace(nsName.String()), tag.Error(err)) + a.logger.Error("Unable to delete namespace from persistence.", tag.WorkflowNamespace(nsName.String()), tag.Error(err)) return err } From 47555e387955cdbb85243b6c6ae47124da4bf532 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 28 Mar 2022 18:32:03 -0700 Subject: [PATCH 2/4] Remove cancellation support --- .../worker/deletenamespace/reclaimresources/activities.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/service/worker/deletenamespace/reclaimresources/activities.go b/service/worker/deletenamespace/reclaimresources/activities.go index dfc11715309..eca0b248304 100644 --- a/service/worker/deletenamespace/reclaimresources/activities.go +++ b/service/worker/deletenamespace/reclaimresources/activities.go @@ -35,7 +35,6 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" - "go.temporal.io/server/service/worker/deletenamespace/errors" ) type ( @@ -96,12 +95,6 @@ func (a *Activities) EnsureNoExecutionsActivity(ctx context.Context, nsID namesp if count > 0 { a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(int(count))) activity.RecordHeartbeat(ctx, count) - select { - case <-ctx.Done(): - return ctx.Err() - default: - return errors.ErrExecutionsStillExist - } } return nil } From 7469ba2c3a2f20c639f4e85bd37e098e0150afa1 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 28 Mar 2022 20:39:48 -0700 Subject: [PATCH 3/4] Change Warn to Info logging level --- service/worker/deletenamespace/deleteexecutions/activities.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/worker/deletenamespace/deleteexecutions/activities.go b/service/worker/deletenamespace/deleteexecutions/activities.go index 550093b9eb6..6b4a46e56ea 100644 --- a/service/worker/deletenamespace/deleteexecutions/activities.go +++ b/service/worker/deletenamespace/deleteexecutions/activities.go @@ -139,7 +139,7 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete case nil: case *serviceerror.NotFound: // Workflow execution has already completed or doesn't exist. a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.TerminateExecutionNotFoundCount) - a.logger.Warn("Workflow execution is not found or not running.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId())) + a.logger.Info("Workflow execution is not found or not running.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId())) default: a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.TerminateExecutionFailuresCount) a.logger.Error("Unable to terminate workflow execution.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()), tag.Error(err)) @@ -157,7 +157,7 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionsSuccessCount) case *serviceerror.NotFound: // Workflow execution doesn't exist. Do nothing. a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionNotFoundCount) - a.logger.Warn("Workflow execution is not found.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId())) + a.logger.Info("Workflow execution is not found.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId())) default: result.ErrorCount++ a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionFailuresCount) From 17c686a3ff015c896344973a1a735eb9560c2cdb Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 29 Mar 2022 09:10:53 -0700 Subject: [PATCH 4/4] Minor fix --- service/worker/deletenamespace/deleteexecutions/activities.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/worker/deletenamespace/deleteexecutions/activities.go b/service/worker/deletenamespace/deleteexecutions/activities.go index 6b4a46e56ea..5fe981fa0a8 100644 --- a/service/worker/deletenamespace/deleteexecutions/activities.go +++ b/service/worker/deletenamespace/deleteexecutions/activities.go @@ -116,7 +116,7 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req) if err != nil { a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.ListExecutionsFailuresCount) - a.logger.Error("Unable to list all workflows.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err)) + a.logger.Error("Unable to list all workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err)) return result, err } for _, execution := range resp.Executions {