Skip to content

Commit

Permalink
Schedule a new workflow task after force flush buffer events (#4490)
Browse files Browse the repository at this point in the history
* Schedule a new workflow task after force flush buffer events
  • Loading branch information
wxing1292 committed Jun 14, 2023
1 parent 31fa19d commit 7ceddc7
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 1 deletion.
1 change: 1 addition & 0 deletions service/history/ndc/branch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (r *BranchMgrImpl) flushBufferedEvents(
if err := targetWorkflow.FlushBufferedEvents(); err != nil {
return nil, 0, err
}

// the workflow must be updated as active, to send out replication tasks
if err := targetWorkflow.context.UpdateWorkflowExecutionAsActive(
ctx,
Expand Down
5 changes: 5 additions & 0 deletions service/history/ndc/branch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -254,6 +255,10 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() {
"",
int64(0),
).Return(&historypb.HistoryEvent{}, nil)
s.mockMutableState.EXPECT().AddWorkflowTaskScheduledEvent(
false,
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
).Return(&workflow.WorkflowTaskInfo{}, nil)
s.mockMutableState.EXPECT().FlushBufferedEvents()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastWriteVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
Expand Down
11 changes: 10 additions & 1 deletion service/history/ndc/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,16 @@ func (r *WorkflowImpl) FlushBufferedEvents() error {
return serviceerror.NewInternal("Workflow encountered workflow with buffered events but last write not from current cluster")
}

return r.failWorkflowTask(lastWriteVersion)
if err = r.failWorkflowTask(lastWriteVersion); err != nil {
return err
}
if _, err := r.mutableState.AddWorkflowTaskScheduledEvent(
false,
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
); err != nil {
return err
}
return nil
}

func (r *WorkflowImpl) failWorkflowTask(
Expand Down
Empty file.

0 comments on commit 7ceddc7

Please sign in to comment.