Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset for workflows without completed tasks #665

Merged
merged 8 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions service/history/workflowResetor.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (w *workflowResetorImpl) validateResetWorkflowAfterReplay(newMutableState m
if retError := newMutableState.CheckResettable(); retError != nil {
return retError
}
if !newMutableState.HasInFlightWorkflowTask() {
if !newMutableState.HasPendingWorkflowTask() {
return serviceerror.NewInternal(fmt.Sprintf("can't find the last started workflow task"))
}
if newMutableState.HasBufferedEvents() {
Expand Down Expand Up @@ -290,8 +290,7 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset(

// failed the in-flight workflow task(started).
// Note that we need to ensure WorkflowTaskFailed event is appended right after WorkflowTaskStarted event
workflowTask, _ := newMutableState.GetInFlightWorkflowTask()

workflowTask, _ := newMutableState.GetPendingWorkflowTask()
_, err := newMutableState.AddWorkflowTaskFailedEvent(workflowTask.ScheduleID, workflowTask.StartedID, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, nil,
identityHistoryService, resetReason, baseRunID, newRunID, forkEventVersion)
if err != nil {
Expand Down Expand Up @@ -751,8 +750,10 @@ func validateLastBatchOfReset(lastBatch []*historypb.HistoryEvent, workflowTaskF
return serviceerror.NewInvalidArgument(fmt.Sprintf("wrong WorkflowTaskFinishEventId, it must be WorkflowTaskStarted + 1: %v", lastEvent.GetEventId()))
}

if lastEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
return serviceerror.NewInvalidArgument(fmt.Sprintf("wrong WorkflowTaskFinishEventId, previous batch doesn't include WorkflowTaskStarted, lastFirstEventId: %v", firstEvent.GetEventId()))
if lastEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED &&
lastEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
return serviceerror.NewInvalidArgument(fmt.Sprintf("unable to use provided event id %v as a reset point as previous batch [%v-%v] should end with WorkflowTaskStarted or WorkflowTaskScheduled event",
workflowTaskFinishEventID, firstEvent.GetEventId(), lastEvent.GetEventId()))
}

return nil
Expand Down
176 changes: 169 additions & 7 deletions service/history/workflowResetor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_NoReplication() {
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -1403,7 +1403,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_NoReplication_WithRequestCance
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -1998,7 +1998,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -2703,7 +2703,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() {
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -3304,7 +3304,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -3923,7 +3923,7 @@ func (s *resetorSuite) TestApplyReset() {
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -4319,7 +4319,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_WithoutRunID() {
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
Expand Down Expand Up @@ -4371,3 +4371,165 @@ func (s *resetorSuite) TestResetWorkflowExecution_WithoutRunID() {
s.Nil(err)
s.NotNil(response.RunId)
}

func (s *resetorSuite) TestResetWorkflowExecution_NoCompletedTasks() {
testNamespaceEntry := cache.NewLocalNamespaceCacheEntryForTest(
&persistenceblobs.NamespaceInfo{Id: testNamespaceID}, &persistenceblobs.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", nil,
)
s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes()

namespaceID := testNamespaceID

request := &historyservice.ResetWorkflowExecutionRequest{NamespaceId: namespaceID, ResetRequest: &workflowservice.ResetWorkflowExecutionRequest{}}

wid := "wId"
wfType := "wfType"
taskQueueName := "taskQueue"
forkRunID := uuid.New().String()
currRunID := uuid.New().String()

we := commonpb.WorkflowExecution{
WorkflowId: wid,
RunId: forkRunID,
}

request.ResetRequest = &workflowservice.ResetWorkflowExecutionRequest{
Namespace: "testNamespace",
WorkflowExecution: &we,
Reason: "test reset",
WorkflowTaskFinishEventId: 3,
RequestId: uuid.New().String(),
}

forkBranchToken := []byte("forkBranchToken")
taskQueue := &taskqueuepb.TaskQueue{
Name: taskQueueName,
}
// Prepare history event sequence.
readHistoryResponse := &persistence.ReadHistoryBranchByBatchResponse{
NextPageToken: nil,
Size: 1000,
LastFirstEventID: int64(3),
History: []*historypb.History{
{
Events: []*historypb.HistoryEvent{
{
EventId: 1,
Version: common.EmptyVersion,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{
Name: wfType,
},
TaskQueue: taskQueue,
Input: payloads.EncodeString("testInput"),
WorkflowExecutionTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowRunTimeout: timestamp.DurationPtr(50 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(200 * time.Second),
}},
},
{
EventId: 2,
Version: common.EmptyVersion,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{
TaskQueue: taskQueue,
StartToCloseTimeout: timestamp.DurationPtr(100 * time.Second),
}},
},
},
},
{
Events: []*historypb.HistoryEvent{
{
EventId: 3,
Version: common.EmptyVersion,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT,
Attributes: &historypb.HistoryEvent_WorkflowTaskTimedOutEventAttributes{WorkflowTaskTimedOutEventAttributes: &historypb.WorkflowTaskTimedOutEventAttributes{}},
},
},
},
},
}

eid := int64(0)
eventTime := time.Unix(0, 1000).UTC()
for _, be := range readHistoryResponse.History {
for _, e := range be.Events {
eid++
if e.GetEventId() != eid {
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
}
e.EventTime = &eventTime
}
}

// Mock calls.
s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{
NamespaceID: namespaceID,
Execution: commonpb.WorkflowExecution{
WorkflowId: wid,
RunId: forkRunID,
},
}).Return(&persistence.GetWorkflowExecutionResponse{State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
NamespaceID: namespaceID,
WorkflowID: wid,
WorkflowTypeName: wfType,
TaskQueue: taskQueueName,
RunID: forkRunID,
BranchToken: forkBranchToken,
NextEventID: 4,
WorkflowTaskVersion: common.EmptyVersion,
WorkflowTaskScheduleID: common.EmptyEventID,
WorkflowTaskStartedID: common.EmptyEventID,
State: enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
},
ExecutionStats: &persistenceblobs.ExecutionStats{},
}}, nil).Once()
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(&persistence.GetCurrentExecutionResponse{
RunID: currRunID,
}, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{
NamespaceID: namespaceID,
Execution: commonpb.WorkflowExecution{
WorkflowId: wid,
RunId: currRunID,
},
}).Return(&persistence.GetWorkflowExecutionResponse{State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
NamespaceID: namespaceID,
WorkflowID: wid,
WorkflowTypeName: wfType,
TaskQueue: taskQueueName,
RunID: currRunID,
NextEventID: common.FirstEventID,
WorkflowTaskVersion: common.EmptyVersion,
WorkflowTaskScheduleID: common.EmptyEventID,
WorkflowTaskStartedID: common.EmptyEventID,
State: enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
},
ExecutionStats: &persistenceblobs.ExecutionStats{},
}}, nil).Once()
s.mockHistoryV2Mgr.On("ReadHistoryBranchByBatch", &persistence.ReadHistoryBranchRequest{
BranchToken: forkBranchToken,
MinEventID: common.FirstEventID,
MaxEventID: int64(4),
PageSize: defaultHistoryPageSize,
NextPageToken: nil,
ShardID: &s.shardID,
}).Return(readHistoryResponse, nil).Once()
s.mockHistoryV2Mgr.On("ForkHistoryBranch", mock.Anything).Return(&persistence.ForkHistoryBranchResponse{
NewBranchToken: []byte("newBranch"),
}, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&persistence.AppendHistoryNodesResponse{
Size: 200,
}, nil).Times(2)
s.mockExecutionMgr.On("ResetWorkflowExecution", mock.Anything).Return(nil).Once()

// Perform a reset and make sure there is no error.
response, err := s.historyEngine.ResetWorkflowExecution(context.Background(), request)
s.Nil(err)
s.NotNil(response.RunId)
}
4 changes: 2 additions & 2 deletions tools/cli/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var envKeysForUserName = []string{
}

var resetTypesMap = map[string]string{
"FirstWorkflowTaskCompleted": "",
"LastWorkflowTaskCompleted": "",
"FirstWorkflowTask": "",
"LastWorkflowTask": "",
"LastContinuedAsNew": "",
"BadBinary": FlagResetBadBinaryChecksum,
}
Expand Down
36 changes: 22 additions & 14 deletions tools/cli/workflowCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,8 +1725,8 @@ func isLastEventWorkflowTaskFailedWithNonDeterminism(ctx context.Context, namesp
func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskFinishID int64, err error) {
fmt.Println("resetType:", resetType)
switch resetType {
case "LastWorkflowTaskCompleted":
resetBaseRunID, workflowTaskFinishID, err = getLastWorkflowTaskCompletedID(ctx, namespace, wid, rid, frontendClient)
case "LastWorkflowTask":
resetBaseRunID, workflowTaskFinishID, err = getLastWorkflowTaskEventID(ctx, namespace, wid, rid, frontendClient)
if err != nil {
return
}
Expand All @@ -1735,8 +1735,8 @@ func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, names
if err != nil {
return
}
case "FirstWorkflowTaskCompleted":
resetBaseRunID, workflowTaskFinishID, err = getFirstWorkflowTaskCompletedID(ctx, namespace, wid, rid, frontendClient)
case "FirstWorkflowTask":
resetBaseRunID, workflowTaskFinishID, err = getFirstWorkflowTaskEventID(ctx, namespace, wid, rid, frontendClient)
if err != nil {
return
}
Expand All @@ -1752,7 +1752,8 @@ func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, names
return
}

func getLastWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskCompletedID int64, err error) {
// Returns event id of the last completed task or id of the next event after scheduled task.
func getLastWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) {
resetBaseRunID = rid
req := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand All @@ -1771,7 +1772,9 @@ func getLastWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid str
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
workflowTaskCompletedID = e.GetEventId()
workflowTaskEventID = e.GetEventId()
} else if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
workflowTaskEventID = e.GetEventId() + 1
}
}
if len(resp.NextPageToken) != 0 {
Expand All @@ -1780,8 +1783,8 @@ func getLastWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid str
break
}
}
if workflowTaskCompletedID == 0 {
return "", 0, printErrorAndReturn("Get LastWorkflowTaskCompletedID failed", fmt.Errorf("no WorkflowTaskCompletedID"))
if workflowTaskEventID == 0 {
return "", 0, printErrorAndReturn("Get LastWorkflowTaskID failed", fmt.Errorf("unable to find any scheduled or completed task"))
}
return
}
Expand Down Expand Up @@ -1814,7 +1817,8 @@ func getBadWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid, bin
return
}

func getFirstWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskCompletedID int64, err error) {
// Returns id of the first workflow task completed event or if it doesn't exist then id of the event after task scheduled event.
func getFirstWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) {
resetBaseRunID = rid
req := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand All @@ -1825,16 +1829,20 @@ func getFirstWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid st
MaximumPageSize: 1000,
NextPageToken: nil,
}

for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
workflowTaskCompletedID = e.GetEventId()
return resetBaseRunID, workflowTaskCompletedID, nil
workflowTaskEventID = e.GetEventId()
return resetBaseRunID, workflowTaskEventID, nil
}
if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
if workflowTaskEventID == 0 {
workflowTaskEventID = e.GetEventId() + 1
}
}
}
if len(resp.NextPageToken) != 0 {
Expand All @@ -1843,8 +1851,8 @@ func getFirstWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid st
break
}
}
if workflowTaskCompletedID == 0 {
return "", 0, printErrorAndReturn("Get FirstWorkflowTaskCompletedID failed", fmt.Errorf("no WorkflowTaskCompletedID"))
if workflowTaskEventID == 0 {
return "", 0, printErrorAndReturn("Get FirstWorkflowTaskID failed", fmt.Errorf("unable to find any scheduled or completed task"))
}
return
}
Expand Down