-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Address false positive non-determinism scenarios during replay #1281
base: master
Are you sure you want to change the base?
Changes from 9 commits
ec3ecd5
f85fe93
8647c6c
b5ec4d9
223fb5d
bff285f
b26326f
ff4f4d1
51dbd59
576e118
45560c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -262,6 +262,28 @@ func isDecisionEvent(eventType s.EventType) bool { | |
} | ||
} | ||
|
||
// isDecisionEventForReplay is different from isDecisionEvent because during replays | ||
// we want to intentionally ignore workflow complete/fail/cancel/continueasnew events so that | ||
// decision tree replays matches with the workflow processing respond tasks | ||
func isDecisionEventForReplay(eventType s.EventType) bool { | ||
taylanisikdemir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
switch eventType { | ||
case | ||
s.EventTypeActivityTaskScheduled, | ||
s.EventTypeActivityTaskCancelRequested, | ||
s.EventTypeTimerStarted, | ||
s.EventTypeTimerCanceled, | ||
s.EventTypeCancelTimerFailed, | ||
s.EventTypeMarkerRecorded, | ||
s.EventTypeStartChildWorkflowExecutionInitiated, | ||
s.EventTypeRequestCancelExternalWorkflowExecutionInitiated, | ||
s.EventTypeSignalExternalWorkflowExecutionInitiated, | ||
s.EventTypeUpsertWorkflowSearchAttributes: | ||
return true | ||
default: | ||
return false | ||
} | ||
} | ||
Comment on lines
+268
to
+285
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To stick a comment in here as a random representative location, from chats a while back, for visibility: ... but I'm not (yet) confident that it's correct, nor that it doesn't worsen other scenarios. It's a pretty substantial change despite how small it is. From vague memory of my last attempt to verify, I ended up with more concerns than I started with, but no real evidence in either direction :\ Some of ^ this is "I'm not entirely sure what the end result is during replay" and some is "it's extremely hard to verify that this does not impact normal replays / restores original behavior when disabled" (because the code here is extremely convoluted and stateful). E.g. it could be written in a more obviously-safe way than it is, which would make the latter concern go away, but perhaps this is a cleaner end result than that would be. The former is... hard though. It may be fine even if it does introduce new flaws, it's definitely not perfect now, but we need to have an informed decision. The good news is that we now have a LOT more internal replay-shadow-tests (compared to when this was first made) that we can run to discover what has changed - if it finds nothing but improvements against our fairly wide range of behavior in those tests, that will probably be good enough. So tl;dr: it just has to be sufficiently-validated or fully understood/described. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All sounds good to me. At the time I looked into these false positives I focused on a subset of the potential scenarios. I'm not super confident this is completely safe change even though it looks like it is. Finding more scenarios and adding them as test cases should be next step as discussed offline. Due to other priorities I'm handing this over to you and @agautam478. |
||
|
||
// NextDecisionEvents returns events that there processed as new by the next decision. | ||
// TODO(maxim): Refactor to return a struct instead of multiple parameters | ||
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) { | ||
|
@@ -840,6 +862,19 @@ process_Workflow_Loop: | |
return response, err | ||
} | ||
|
||
// ProcessWorkflowTask processes the given workflow which includes | ||
// - fetching, reordering and replaying historical decision events. (Decision events in this context is an umbrella term for workflow relevant events) | ||
// - state machine is incrementally built with every decision. | ||
// - state machine makes sure that when a workflow restarts for some reason same activities (or timers etc.) are not called again and previous result state is loaded into memory | ||
// | ||
// Note about Replay tests mode: | ||
taylanisikdemir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// | ||
// This mode works by replaying the historical decision events responses (as defined in isDecisionEventForReplay()) | ||
// and comparing these with the replays gotten from state machine | ||
// | ||
// Compared to isDecisionEvent(), isDecisionEventForReplay() omits the following events even though they are workflow relevant respond events: | ||
// complete/failed/cancel/continueasnew | ||
// The reason is that state machine doesn't have a correspondong decision for these so they cause false positive non-determinism errors in Replay tests. | ||
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) { | ||
task := workflowTask.task | ||
historyIterator := workflowTask.historyIterator | ||
|
@@ -899,8 +934,16 @@ ProcessEvents: | |
for i, event := range reorderedEvents { | ||
isInReplay := reorderedHistory.IsReplayEvent(event) | ||
isLast := !isInReplay && i == len(reorderedEvents)-1 | ||
if !skipReplayCheck && isDecisionEvent(event.GetEventType()) { | ||
respondEvents = append(respondEvents, event) | ||
if !skipReplayCheck { | ||
isDecisionEventFn := isDecisionEvent | ||
// when strict nondeterminism is enabled we use a different function to check for decision events during replay | ||
if !w.wth.disableStrictNonDeterminism && isInReplay { | ||
isDecisionEventFn = isDecisionEventForReplay | ||
} | ||
|
||
if isDecisionEventFn(event.GetEventType()) { | ||
respondEvents = append(respondEvents, event) | ||
} | ||
} | ||
|
||
if isPreloadMarkerEvent(event) { | ||
|
@@ -918,7 +961,16 @@ ProcessEvents: | |
if err != nil { | ||
return nil, err | ||
} | ||
if w.isWorkflowCompleted { | ||
|
||
// Break the event processing loop if either | ||
// - Workflow is completed AND strict nondeterminism checks disabled. | ||
// - Workflow is completed AND strict nondeterminism checks enabled AND NOT in replay mode. | ||
// With strict nondeterminism checks enabled, breaking the loop early causes missing events | ||
// in respondEvents which then causes false positives or false negatives. | ||
stopProcessing := (w.isWorkflowCompleted && w.wth.disableStrictNonDeterminism) || | ||
(w.isWorkflowCompleted && !w.wth.disableStrictNonDeterminism && !isInReplay) | ||
|
||
if stopProcessing { | ||
break ProcessEvents | ||
} | ||
} | ||
|
@@ -936,6 +988,9 @@ ProcessEvents: | |
} | ||
} | ||
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) | ||
// incomplete decisions (e.g. start without a complete) at the end of history will still have decisions in decisionsHelper | ||
// but there won't be corresponding respond events. This breaks the non-determinism check therefore we ignore such final partial decisions. | ||
// Example scenario is covered by TestReplayWorkflowHistory_Partial_NoDecisionEvents | ||
lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents() | ||
if isReplay && !lastDecisionEventsForReplayTest { | ||
eventDecisions := eventHandler.decisionsHelper.getDecisions(true) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -176,7 +176,34 @@ | |
}, | ||
{ | ||
"eventId": 12, | ||
"timestamp": 1679427717321911295, | ||
"timestamp": 1679427717321780254, | ||
"eventType": "ActivityTaskStarted", | ||
"version": 0, | ||
"taskId": 5243011, | ||
"activityTaskStartedEventAttributes": { | ||
"scheduledEventId": 11, | ||
"identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb", | ||
"requestId": "ae2aad96-6588-4359-807b-a39a16f0896a", | ||
"attempt": 0, | ||
"lastFailureReason": "" | ||
} | ||
}, | ||
{ | ||
"eventId": 13, | ||
"timestamp": 1679427717321780255, | ||
"eventType": "ActivityTaskCompleted", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why add the completed event? though reading more of this file: I don't think this is a realistic history... "completed" implies "ended normally, i.e. by returning" but we don't create activity-task-scheduled events for activities executed during the final decision (since their result cannot be observed)... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it wasn't a realistic history because it's missing the final decision task scheduled->started->completed events. I'm not quite getting why adding the activity completion looks invalid if that's what you meant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well, there are only 2 ways workflows can end:
and everything that affects ^ this occurs in transactions (no interleaving between these):
in this history we originally had:
^ this is possible because, unlike child workflows, we don't "prune" activity-scheduled-s that are created in the final decision task. (we should consider doing that tbh, or keeping both for easier troubleshooting. I forget exactly how child workflows do this, but I suspect the workflow-end cancels the context -> cancels the state machine -> So in this final decision task, we executed one activity, did not wait on it, and returned immediately. Code like this would result in that: func work(ctx workflow.Context) error {
// ...
workflow.ExecuteActivity(ctx, "ignored") // non-blocking
return nil
} As of the current PR, we have this sequence:
which is not possible. it would mean we blocked after scheduling (this is necessary to submit it to the server, which is a necessary prerequisite to the activity starting)... which is fine because however. after that, asynchronously, the activity was started (this can be an isolated event because it's not observable, that's fine), and then completed (this requires both complete and decision-task-scheduled because it is observable, so already it's wrong), and then also the workflow function returned on its own without a decision task to learn about that result. The only possible outcomes that include an activity that was started are things like either:
because "activity completed -> workflow completed" would be splitting the transaction that "activity completed" is involved in, at the very least. so it's clearly hand-manipulated rather than recorded, because the hand-manipulation produces an impossible sequence. that would be fine for a regression-like "we had a bug that produced these histories, and we want to ensure it behaves like X" tests... but it doesn't help prove much of anything about how non-corrupted workflows behave, and that's pretty much the only purpose of this folder + PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tbh I'm kinda surprised it doesn't panic with an illegal transition or something. But it would be an easy check to miss in the state machine too (particularly because it's more "a collection of many sub-state-machines" rather than enforcing rules in the spaces between them) |
||
"version": 0, | ||
"taskId": 5243000, | ||
"activityTaskCompletedEventAttributes": { | ||
"result": "ImJhbmFuYSIK", | ||
"scheduledEventId": 11, | ||
"startedEventId": 12, | ||
"identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb" | ||
} | ||
}, | ||
{ | ||
"eventId": 14, | ||
"timestamp": 1679427717321780256, | ||
"eventType": "WorkflowExecutionCompleted", | ||
"version": 0, | ||
"taskId": 5243011, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
[ | ||
{ | ||
"eventId": 1, | ||
"timestamp": 1699856700704442400, | ||
"eventType": "WorkflowExecutionStarted", | ||
"version": 4, | ||
"taskId": 882931375, | ||
"workflowExecutionStartedEventAttributes": { | ||
"workflowType": { | ||
"name": "fx.SimpleSignalWorkflow" | ||
}, | ||
"taskList": { | ||
"name": "fx-worker" | ||
}, | ||
"executionStartToCloseTimeoutSeconds": 600, | ||
"taskStartToCloseTimeoutSeconds": 10, | ||
"continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1", | ||
"initiator": "CronSchedule", | ||
"continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE", | ||
"originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1", | ||
"firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5", | ||
"firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00", | ||
"cronSchedule": "* * * * *", | ||
"firstDecisionTaskBackoffSeconds": 60, | ||
"PartitionConfig": { | ||
"isolation-group": "dca11" | ||
} | ||
} | ||
}, | ||
{ | ||
"eventId": 2, | ||
"timestamp": 1699856760713586608, | ||
"eventType": "DecisionTaskScheduled", | ||
"version": 4, | ||
"taskId": 882931383, | ||
"decisionTaskScheduledEventAttributes": { | ||
"taskList": { | ||
"name": "fx-worker" | ||
}, | ||
"startToCloseTimeoutSeconds": 10 | ||
} | ||
}, | ||
{ | ||
"eventId": 3, | ||
"timestamp": 1699856760741837021, | ||
"eventType": "DecisionTaskStarted", | ||
"version": 4, | ||
"taskId": 882931387, | ||
"decisionTaskStartedEventAttributes": { | ||
"scheduledEventId": 2, | ||
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4", | ||
"requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04" | ||
} | ||
}, | ||
{ | ||
"eventId": 4, | ||
"timestamp": 1699856760773459755, | ||
"eventType": "DecisionTaskCompleted", | ||
"version": 4, | ||
"taskId": 882931391, | ||
"decisionTaskCompletedEventAttributes": { | ||
"scheduledEventId": 2, | ||
"startedEventId": 3, | ||
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4", | ||
"binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a" | ||
} | ||
}, | ||
{ | ||
"eventId": 5, | ||
"timestamp": 1699857360713649962, | ||
"eventType": "WorkflowExecutionContinuedAsNew", | ||
"version": 4, | ||
"taskId": 882931394, | ||
"workflowExecutionContinuedAsNewEventAttributes": { | ||
"newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90", | ||
"workflowType": { | ||
"name": "fx.SimpleSignalWorkflow" | ||
}, | ||
"taskList": { | ||
"name": "fx-worker" | ||
}, | ||
"executionStartToCloseTimeoutSeconds": 600, | ||
"taskStartToCloseTimeoutSeconds": 10, | ||
"decisionTaskCompletedEventId": -23, | ||
"backoffStartIntervalInSeconds": 60, | ||
"initiator": "CronSchedule", | ||
"failureReason": "cadenceInternal:Timeout START_TO_CLOSE" | ||
} | ||
} | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package replaytests | ||
|
||
import ( | ||
"go.uber.org/cadence/workflow" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal | ||
func ContinueAsNewWorkflow(ctx workflow.Context) error { | ||
selector := workflow.NewSelector(ctx) | ||
var signalResult string | ||
signalName := "helloWorldSignal" | ||
for { | ||
signalChan := workflow.GetSignalChannel(ctx, signalName) | ||
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) { | ||
c.Receive(ctx, &signalResult) | ||
workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult)) | ||
}) | ||
workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName) | ||
// Wait for signal | ||
selector.Select(ctx) | ||
if signalResult == "kill" { | ||
return nil | ||
} | ||
} | ||
} | ||
Comment on lines
+9
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the only history this is running on is the func ContinueAsNewWorkflow(ctx workflow.Context) error {
return workflow.GetSignalChannel(ctx, "unused").Receive(ctx, nil)
} and it would be exactly equivalent: it decides to do nothing, waiting on an outside event to occur. not sure what the added complexity is buying us, unless it triggers some specific edge case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. afaik, continue ask new cases fail for only those scenarios where a workflow was marked continue as new and then the history has nothing after continue as new decision. I would say let's keep the original behavior here to preserve the original scenario where we first observed this bug. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's no signal in the history though, so it can't be a "received one, did not receive a second" kind of test. anything that waits forever without recording an event should be equivalent (signal chan, regular chan, waitgroup, etc all would work) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 is incorrect, a string-executed activity that doesn't return data still calls
deSerializeFnResultFromFnType
and returns at line 430, doing nothing. It doesn't reach this line.Also huh. On closer look, this whole chain of code is pretty strange:
deSerializeFunctionResult
is only called ifresult
is non-nildeSerializeFnResultFromFnType
is only called by this func, and it does nothing ifresult
is nildecodeArg
here, which is whatdeSerializeFnResultFromFnType
does anyway... I think
deSerializeFunctionResult
might be more-correctly replaced with just a call todecodeArg
instead.(this doesn't really belong in this PR IMO, it's just an odd discovery)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I need to re-debug this to remember how I ended up finding those 2 cases. Maybe I meant to add this somewhere else. From the code in this function it seems 2 is not possible.