8000 Fix DeleteWorkflowExecution API when delete non current execution by alexshtin · Pull Request #2484 · temporalio/temporal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix DeleteWorkflowExecution API when delete non current execution #2484

8000
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func NewEngineWithShardContext(
historyCache,
config,
archivalClient,
shard.GetTimeSource(),
)

historyEngImpl := &historyEngineImpl{
Expand Down Expand Up @@ -2267,31 +2268,23 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(

func (e *historyEngineImpl) DeleteWorkflowExecution(
ctx context.Context,
deleteRequest *historyservice.DeleteWorkflowExecutionRequest,
) error {

return e.updateWorkflow(
ctx,
namespace.ID(deleteRequest.NamespaceId),
*deleteRequest.GetWorkflowExecution(),
func(weCtx workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
if mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowNotCompleted // workflow is running, cannot be deleted
}

taskGenerator := workflow.NewTaskGenerator(
e.shard.GetNamespaceRegistry(),
e.logger,
mutableState,
)
request *historyservice.DeleteWorkflowExecutionRequest,
) (retError error) {
nsID := namespace.ID(request.GetNamespaceId())

err := taskGenerator.GenerateDeleteExecutionTask(e.timeSource.Now())
if err != nil {
return nil, err
}
wfCtx, err := e.loadWorkflow(ctx, nsID, request.GetWorkflowExecution().GetWorkflowId(), request.GetWorkflowExecution().GetRunId())
if err != nil {
return err
}
defer func() { wfCtx.getReleaseFn()(retError) }()

return updateWorkflowWithoutWorkflowTask, nil
})
return e.workflowDeleteManager.AddDeleteWorkflowExecutionTask(
nsID,
commonpb.WorkflowExecution{
WorkflowId: request.GetWorkflowExecution().GetWorkflowId(),
RunId: request.GetWorkflowExecution().GetRunId(),
},
wfCtx.getMutableState())
}

// RecordChildExecutionCompleted records the completion of child execution into parent execution history
Expand Down
2 changes: 1 addition & 1 deletion service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func newNDCHistoryReplicator(
logger,
state,
func(mutableState workflow.MutableState) workflow.TaskGenerator {
return workflow.NewTaskGenerator(shard.GetNamespaceRegistry(), logger, mutableState)
return workflow.NewTaskGenerator(shard.GetNamespaceRegistry(), mutableState)
},
)
},
Expand Down
2 changes: 1 addition & 1 deletion service/history/nDCStateRebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *nDCStateRebuilderImpl) initializeBuilders(
r.logger,
resetMutableStateBuilder,
func(mutableState workflow.MutableState) workflow.TaskGenerator {
return workflow.NewTaskGenerator(r.shard.GetNamespaceRegistry(), r.logger, mutableState)
return workflow.NewTaskGenerator(r.shard.GetNamespaceRegistry(), mutableState)
},
)
return resetMutableStateBuilder, stateBuilder
Expand Down
56 changes: 50 additions & 6 deletions service/history/workflow/delete_manager.go
8000
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ import (
enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/worker/archiver"
)

type (
DeleteManager interface {
DeleteWorkflowExecution(namespaceID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
DeleteWorkflowExecutionByRetention(namespaceID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
AddDeleteWorkflowExecutionTask(nsID namespace.ID, we commonpb.WorkflowExecution, ms MutableState) error
DeleteWorkflowExecution(nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
DeleteWorkflowExecutionByRetention(nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
}

DeleteManagerImpl struct {
Expand All @@ -55,6 +59,7 @@ type (
config *configs.Config
metricsClient metrics.Client
archivalClient archiver.Client
timeSource clock.TimeSource
}
)

Expand All @@ -65,20 +70,59 @@ func NewDeleteManager(
cache Cache,
config *configs.Config,
archiverClient archiver.Client,
timeSource clock.TimeSource,
) *DeleteManagerImpl {
deleteManager := &DeleteManagerImpl{
shard: shard,
historyCache: cache,
metricsClient: shard.GetMetricsClient(),
config: config,
archivalClient: archiverClient,
timeSource: timeSource,
}

return deleteManager
}
func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
nsID namespace.ID,
we commonpb.WorkflowExecution,
ms MutableState,
) error {

if ms.IsWorkflowExecutionRunning() {
// Running workflow cannot be deleted. Close or terminate it first.
return consts.ErrWorkflowNotCompleted
}

taskGenerator := NewTaskGenerator(
m.shard.GetNamespaceRegistry(),
ms,
)

deleteTask, err := taskGenerator.GenerateDeleteExecutionTask(m.timeSource.Now())
if err != nil {
return err
}

err = m.shard.AddTasks(&persistence.AddTasksRequest{
ShardID: m.shard.GetShardID(),
// RangeID is set by shard
NamespaceID: nsID.String(),
WorkflowID: we.GetWorkflowId(),
RunID: we.GetRunId(),
Tasks: map[tasks.Category][]tasks.Task{
tasks.CategoryTransfer: {deleteTask},
},
})
if err != nil {
return err
}

return nil
}

func (m *DeleteManagerImpl) DeleteWorkflowExecution(
namespaceID namespace.ID,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
Expand All @@ -94,7 +138,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
}

err := m.deleteWorkflowExecutionInternal(
namespaceID,
nsID,
we,
weCtx,
ms,
Expand All @@ -107,7 +151,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
}

func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
namespaceID namespace.ID,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
Expand All @@ -122,7 +166,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
}

err := m.deleteWorkflowExecutionInternal(
namespaceID,
nsID,
we,
weCtx,
ms,
Expand Down
30 changes: 22 additions & 8 deletions service/history/workflow/delete_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions service/history/workflow/delete_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand All @@ -59,6 +60,7 @@ type (
mockCache *MockCache
mockArchivalClient *archiver.MockClient
mockShardContext *shard.MockContext
mockClock *clock.EventTimeSource

deleteManager DeleteManager
}
Expand All @@ -83,6 +85,7 @@ func (s *deleteManagerWorkflowSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockCache = NewMockCache(s.controller)
s.mockArchivalClient = archiver.NewMockClient(s.controller)
s.mockClock = clock.NewEventTimeSource()

config := tests.NewDynamicConfig()
s.mockShardContext = shard.NewMockContext(s.controller)
Expand All @@ -93,6 +96,7 @@ func (s *deleteManagerWorkflowSuite) SetupTest() {
s.mockCache,
config,
s.mockArchivalClient,
s.mockClock,
)
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func NewMutableState(
common.FirstEventID,
s.bufferEventsInDB,
)
s.taskGenerator = NewTaskGenerator(shard.GetNamespaceRegistry(), s.logger, s)
s.taskGenerator = NewTaskGenerator(shard.GetNamespaceRegistry(), s)
s.workflowTaskManager = newWorkflowTaskStateMachine(s)

return s
Expand Down
20 changes: 6 additions & 14 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
Expand All @@ -54,7 +53,7 @@ type (
) error
GenerateDeleteExecutionTask(
now time.Time,
) error
) (*tasks.DeleteExecutionTask, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return tasks.Task or []tasks.Task please.

Once queue refactor change is done, I am going to update all the generator methods to return the generated task, instead of adding it to ms directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Depend on interface, return concrete type", right? Unless there is a good reason for exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we control the caller code and we can prevent them from being closely coupled, I don't really have a strong opinion here.

GenerateRecordWorkflowStartedTasks(
now time.Time,
startEvent *historypb.HistoryEvent,
Expand Down Expand Up @@ -120,9 +119,7 @@ type (

TaskGeneratorImpl struct {
namespaceRegistry namespace.Registry
logger log.Logger

mutableState MutableState
mutableState MutableState
}
)

Expand All @@ -132,15 +129,12 @@ var _ TaskGenerator = (*TaskGeneratorImpl)(nil)

func NewTaskGenerator(
namespaceRegistry namespace.Registry,
logger log.Logger,
mutableState MutableState,
) *TaskGeneratorImpl {

mstg := &TaskGeneratorImpl{
namespaceRegistry: namespaceRegistry,
logger: logger,

mutableState: mutableState,
mutableState: mutableState,
}

return mstg
Expand Down Expand Up @@ -212,18 +206,16 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(

func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask(
now time.Time,
) error {
) (*tasks.DeleteExecutionTask, error) {

currentVersion := r.mutableState.GetCurrentVersion()

r.mutableState.AddTasks(&tasks.DeleteExecutionTask{
return &tasks.DeleteExecutionTask{
// TaskID is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
VisibilityTimestamp: now,
Version: currentVersion,
})

return nil
}, nil
}

func (r *TaskGeneratorImpl) GenerateDelayedWorkflowTasks(
Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (r *TaskRefresherImpl) RefreshTasks(

taskGenerator := NewTaskGenerator(
r.namespaceRegistry,
r.logger,
mutableState,
)

Expand Down
0