From 02be6eaebd7a7939705695ad45f0f4568d3946cc Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 15 Mar 2022 15:36:19 -0700 Subject: [PATCH] Fix shard context error state check --- service/history/shard/context_impl.go | 74 ++++++++++++--------------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 448fc7aa863..75af1fef029 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -549,13 +549,8 @@ func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace func (s *ContextImpl) AddTasks( request *persistence.AddHistoryTasksRequest, ) error { - if err := s.errorByState(); err != nil { - return err - } - - namespaceID := namespace.ID(request.NamespaceID) - // do not try to get namespace cache within shard lock + namespaceID := namespace.ID(request.NamespaceID) namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID) if err != nil { return err @@ -564,20 +559,19 @@ func (s *ContextImpl) AddTasks( s.wLock() defer s.wUnlock() + if err := s.errorByStateLocked(); err != nil { + return err + } + return s.addTasksLocked(request, namespaceEntry) } func (s *ContextImpl) CreateWorkflowExecution( request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error) { - if err := s.errorByState(); err != nil { - return nil, err - } - + // do not try to get namespace cache within shard lock namespaceID := namespace.ID(request.NewWorkflowSnapshot.ExecutionInfo.NamespaceId) workflowID := request.NewWorkflowSnapshot.ExecutionInfo.WorkflowId - - // do not try to get namespace cache within shard lock namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID) if err != nil { return nil, err @@ -586,6 +580,10 @@ func (s *ContextImpl) CreateWorkflowExecution( s.wLock() defer s.wUnlock() + if err := s.errorByStateLocked(); err != nil { + return nil, err + } + transferMaxReadLevel := int64(0) if err := s.allocateTaskIDsLocked( namespaceEntry, @@ -608,14 +606,9 @@ func (s *ContextImpl) CreateWorkflowExecution( func (s *ContextImpl) UpdateWorkflowExecution( request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error) { - if err := s.errorByState(); err != nil { - return nil, err - } - + // do not try to get namespace cache within shard lock namespaceID := namespace.ID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId) workflowID := request.UpdateWorkflowMutation.ExecutionInfo.WorkflowId - - // do not try to get namespace cache within shard lock namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID) if err != nil { return nil, err @@ -624,6 +617,10 @@ func (s *ContextImpl) UpdateWorkflowExecution( s.wLock() defer s.wUnlock() + if err := s.errorByStateLocked(); err != nil { + return nil, err + } + transferMaxReadLevel := int64(0) if err := s.allocateTaskIDsLocked( namespaceEntry, @@ -656,14 +653,9 @@ func (s *ContextImpl) UpdateWorkflowExecution( func (s *ContextImpl) ConflictResolveWorkflowExecution( request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error) { - if err := s.errorByState(); err != nil { - return nil, err - } - + // do not try to get namespace cache within shard lock namespaceID := namespace.ID(request.ResetWorkflowSnapshot.ExecutionInfo.NamespaceId) workflowID := request.ResetWorkflowSnapshot.ExecutionInfo.WorkflowId - - // do not try to get namespace cache within shard lock namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID) if err != nil { return nil, err @@ -672,6 +664,10 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( s.wLock() defer s.wUnlock() + if err := s.errorByStateLocked(); err != nil { + return nil, err + } + transferMaxReadLevel := int64(0) if request.CurrentWorkflowMutation != nil { if err := s.allocateTaskIDsLocked( @@ -714,14 +710,9 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( func (s *ContextImpl) SetWorkflowExecution( request *persistence.SetWorkflowExecutionRequest, ) (*persistence.SetWorkflowExecutionResponse, error) { - if err := s.errorByState(); err != nil { - return nil, err - } - + // do not try to get namespace cache within shard lock namespaceID := namespace.ID(request.SetWorkflowSnapshot.ExecutionInfo.NamespaceId) workflowID := request.SetWorkflowSnapshot.ExecutionInfo.WorkflowId - - // do not try to get namespace cache within shard lock namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID) if err != nil { return nil, err @@ -730,6 +721,10 @@ func (s *ContextImpl) SetWorkflowExecution( s.wLock() defer s.wUnlock() + if err := s.errorByStateLocked(); err != nil { + return nil, err + } + transferMaxReadLevel := int64(0) if err := s.allocateTaskIDsLocked( namespaceEntry, @@ -777,9 +772,12 @@ func (s *ContextImpl) AppendHistoryEvents( namespaceID namespace.ID, execution commonpb.WorkflowExecution, ) (int, error) { - if err := s.errorByState(); err != nil { + s.rLock() + if err := s.errorByStateLocked(); err != nil { + s.rUnlock() return 0, err } + s.rUnlock() request.ShardID = s.shardID @@ -830,10 +828,6 @@ func (s *ContextImpl) DeleteWorkflowExecution( // The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually. // Step 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state). - if err := s.errorByState(); err != nil { - return err - } - // Do not get namespace cache within shard lock. namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(key.NamespaceID)) deleteVisibilityRecord := true @@ -850,6 +844,10 @@ func (s *ContextImpl) DeleteWorkflowExecution( s.wLock() defer s.wUnlock() + if err := s.errorByStateLocked(); err != nil { + return err + } + // Step 1. Delete visibility. if deleteVisibilityRecord { addTasksRequest := &persistence.AddHistoryTasksRequest{ @@ -948,12 +946,6 @@ func (s *ContextImpl) getRangeIDLocked() int64 { return s.shardInfo.GetRangeId() } -func (s *ContextImpl) errorByState() error { - s.rLock() - defer s.rUnlock() - return s.errorByStateLocked() -} - func (s *ContextImpl) errorByStateLocked() error { switch s.state { case contextStateInitialized, contextStateAcquiring: