8000 Emit workflow task attempt stats by yycptt · Pull Request #2487 · temporalio/temporal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Emit workflow task attempt stats #2487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ const (
StickyTTL = "history.stickyTTL"
// WorkflowTaskHeartbeatTimeout for workflow task heartbeat
WorkflowTaskHeartbeatTimeout = "history.workflowTaskHeartbeatTimeout"
// WorkflowTaskCriticalAttempts is the number of attempts for a workflow task that's regarded as critical
WorkflowTaskCriticalAttempts = "history.workflowTaskCriticalAttempt"
// DefaultWorkflowTaskTimeout for a workflow task
DefaultWorkflowTaskTimeout = "history.defaultWorkflowTaskTimeout"
// SkipReapplicationByNamespaceID is whether skipping a event re-application for a namespace
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,7 @@ const (
EmptyCompletionCommandsCounter
MultipleCompletionCommandsCounter
FailedWorkflowTasksCounter
WorkflowTaskAttempt
StaleMutableStateCounter
AutoResetPointsLimitExceededCounter
AutoResetPointCorruptionCounter
Expand Down Expand Up @@ -2384,6 +2385,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
EmptyCompletionCommandsCounter: NewCounterDef("empty_completion_commands"),
MultipleCompletionCommandsCounter: NewCounterDef("multiple_completion_commands"),
FailedWorkflowTasksCounter: NewCounterDef("failed_workflow_tasks"),
WorkflowTaskAttempt: NewDimensionlessHistogramDef("worrkflow_task_attempt"),
StaleMutableStateCounter: NewCounterDef("stale_mutable_state"),
AutoResetPointsLimitExceededCounter: NewCounterDef("auto_reset_points_exceed_limit"),
AutoResetPointCorruptionCounter: NewCounterDef("auto_reset_point_corruption"),
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ type Config struct {
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks
// So that workflow task will be scheduled to another worker(by clear stickyness)
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn

// The following is used by the new RPC replication stack
ReplicationTaskFetcherParallelism dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -377,6 +378,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365),
WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30),
WorkflowTaskCriticalAttempts: dc.GetIntProperty(dynamicconfig.WorkflowTaskCriticalAttempts, 10),

ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 4),
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
Expand Down
32 changes: 31 additions & 1 deletion service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
)

Expand All @@ -58,7 +59,15 @@ func newWorkflowTaskStateMachine(
}
}

func (m *workflowTaskStateMachine) ReplicateWorkflowTaskScheduledEvent(version int64, scheduleID int64, taskQueue *taskqueuepb.TaskQueue, startToCloseTimeoutSeconds int32, attempt int32, scheduleTimestamp *time.Time, originalScheduledTimestamp *time.Time) (*WorkflowTaskInfo, error) {
func (m *workflowTaskStateMachine) ReplicateWorkflowTaskScheduledEvent(
version int64,
scheduleID int64,
taskQueue *taskqueuepb.TaskQueue,
startToCloseTimeoutSeconds int32,
attempt int32,
scheduleTimestamp *time.Time,
originalScheduledTimestamp *time.Time,
) (*WorkflowTaskInfo, error) {

// set workflow state to running, since workflow task is scheduled
// NOTE: for zombie workflow, should not change the state
Expand Down Expand Up @@ -394,6 +403,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
}

workflowTask, err := m.ReplicateWorkflowTaskStartedEvent(workflowTask, m.ms.GetCurrentVersion(), scheduleID, startedID, requestID, startTime)

m.emitWorkflowTaskAttemptStats(workflowTask.Attempt)

// TODO merge active & passive task generation
if err := m.ms.taskGenerator.GenerateStartWorkflowTaskTasks(
startTime, // start time is now
Expand Down Expand Up @@ -739,3 +751,21 @@ func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent(
m.ms.executionInfo.LastWorkflowTaskStartId = event.GetWorkflowTaskCompletedEventAttributes().GetStartedEventId()
return m.ms.addBinaryCheckSumIfNotExists(event, maxResetPoints)
}

func (m *workflowTaskStateMachine) emitWorkflowTaskAttemptStats(
attempt int32,
) {
namespaceName := m.ms.GetNamespaceEntry().Name().String()
m.ms.metricsClient.Scope(
metrics.WorkflowContextScope,
metrics.NamespaceTag(namespaceName),
).RecordDistribution(metrics.WorkflowTaskAttempt, int(attempt))
if attempt >= int32(m.ms.shard.GetConfig().WorkflowTaskCriticalAttempts()) {
m.ms.shard.GetThrottledLogger().Warn("Critical attempts processing workflow task",
tag.WorkflowNamespace(namespaceName),
tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(m.ms.GetExecutionState().RunId),
tag.Attempt(attempt),
)
}
}
0