Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefix some errors with rule identifiers #1281

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func panicIllegalState(message string) {

func (d *commandStateMachineBase) failStateTransition(event string) {
// this is when we detect illegal state transition, likely due to ill history sequence or nondeterministic workflow code
panicIllegalState(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d))
panicIllegalState(fmt.Sprintf("[TMPRL1100] invalid state transition: attempt to %v, %v", event, d))
}

func (d *commandStateMachineBase) handleCommandSent() {
Expand Down Expand Up @@ -918,7 +918,7 @@ func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() {
func (h *commandsHelper) getCommand(id commandID) commandStateMachine {
command, ok := h.commands[id]
if !ok {
panicMsg := fmt.Sprintf("unknown command %v, possible causes are nondeterministic workflow definition code"+
panicMsg := fmt.Sprintf("[TMPRL1100] unknown command %v, possible causes are nondeterministic workflow definition code"+
" or incompatible change in the workflow definition", id)
panicIllegalState(panicMsg)
}
Expand All @@ -927,7 +927,7 @@ func (h *commandsHelper) getCommand(id commandID) commandStateMachine {

func (h *commandsHelper) addCommand(command commandStateMachine) {
if _, ok := h.commands[command.getID()]; ok {
panicMsg := fmt.Sprintf("adding duplicate command %v", command)
panicMsg := fmt.Sprintf("[TMPRL1100] adding duplicate command %v", command)
panicIllegalState(panicMsg)
}
element := h.orderedCommands.PushBack(command)
Expand Down Expand Up @@ -957,7 +957,7 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
func (h *commandsHelper) moveCommandToBack(command commandStateMachine) {
elem := h.commands[command.getID()]
if elem == nil {
panicIllegalState(fmt.Sprintf("moving command not present %v", command))
panicIllegalState(fmt.Sprintf("[TMPRL1100] moving command not present %v", command))
}
h.orderedCommands.Remove(elem)
h.commands[command.getID()] = h.orderedCommands.PushBack(command)
Expand Down Expand Up @@ -993,7 +993,7 @@ func (h *commandsHelper) handleActivityTaskClosed(activityID string, scheduledEv

func (h *commandsHelper) handleActivityTaskScheduled(activityID string, scheduledEventID int64) {
if _, ok := h.scheduledEventIDToActivityID[scheduledEventID]; !ok {
panicMsg := fmt.Sprintf("lookup failed for scheduledEventID to activityID: scheduleEventID: %v, activityID: %v",
panicMsg := fmt.Sprintf("[TMPRL1100] lookup failed for scheduledEventID to activityID: scheduleEventID: %v, activityID: %v",
scheduledEventID, activityID)
panicIllegalState(panicMsg)
}
Expand All @@ -1005,7 +1005,7 @@ func (h *commandsHelper) handleActivityTaskScheduled(activityID string, schedule
func (h *commandsHelper) handleActivityTaskCancelRequested(scheduledEventID int64) {
activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("unable to find activityID for the scheduledEventID: %v", scheduledEventID))
panicIllegalState(fmt.Sprintf("[TMPRL1100] unable to find activityID for the scheduledEventID: %v", scheduledEventID))
}
command := h.getCommand(makeCommandID(commandTypeActivity, activityID))
command.handleCancelInitiatedEvent()
Expand All @@ -1030,12 +1030,12 @@ func (h *commandsHelper) getActivityAndScheduledEventIDs(event *historypb.Histor
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
scheduledEventID = event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventId()
default:
panicIllegalState(fmt.Sprintf("unexpected event type: %v", event.GetEventType()))
panicIllegalState(fmt.Sprintf("[TMPRL1100] unexpected event type: %v", event.GetEventType()))
}

activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("unable to find activityID for the event: %v", util.HistoryEventToString(event)))
panicIllegalState(fmt.Sprintf("[TMPRL1100] unable to find activityID for the event: %v", util.HistoryEventToString(event)))
}
return activityID, scheduledEventID
}
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func (h *commandsHelper) recordVersionMarker(changeID string, version Version, d

func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string, searchAttrUpdated bool) {
if _, ok := h.versionMarkerLookup[eventID]; ok {
panicMsg := fmt.Sprintf("marker event already exists for eventID in lookup: eventID: %v, changeID: %v",
panicMsg := fmt.Sprintf("[TMPRL1100] marker event already exists for eventID in lookup: eventID: %v, changeID: %v",
eventID, changeID)
panicIllegalState(panicMsg)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func TestDeadlockDetectorAndAwaitRace(t *testing.T) {
defer d.Close()
// Expecting deadlock detection timeout instead of a data race.
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
require.EqualError(t, err, "Potential deadlock detected: workflow goroutine \"root\" didn't yield for over a second")
require.EqualError(t, err, "[TMPRL1101] Potential deadlock detected: workflow goroutine \"root\" didn't yield for over a second")
}

func TestAwaitCancellation(t *testing.T) {
Expand Down Expand Up @@ -1670,7 +1670,7 @@ func TestDeadlockDetectorStackTrace(t *testing.T) {
c.Receive(ctx, nil) // blocked forever
})
GoNamed(ctx, "sleeper", func(ctx Context) {
time.Sleep(defaultDeadlockDetectionTimeout + 100 * time.Millisecond)
time.Sleep(defaultDeadlockDetectionTimeout + 100*time.Millisecond)
})
c.Receive(ctx, nil) // blocked forever
})
Expand All @@ -1679,7 +1679,7 @@ func TestDeadlockDetectorStackTrace(t *testing.T) {

var wfPanic *workflowPanicError
require.ErrorAs(t, err, &wfPanic)
require.Equal(t, `Potential deadlock detected: workflow goroutine "sleeper" didn't yield for over a second`, wfPanic.Error())
require.Equal(t, `[TMPRL1101] Potential deadlock detected: workflow goroutine "sleeper" didn't yield for over a second`, wfPanic.Error())
require.Regexp(t, `^coroutine sleeper \[running\]:\ntime\.Sleep\(0x[\da-f]+\)\n`, wfPanic.StackTrace())
require.Equal(t, 4, strings.Count(wfPanic.StackTrace(), "\n"), "2 stack frames expected")
}
10 changes: 5 additions & 5 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,12 +783,12 @@ func (wc *workflowEnvironmentImpl) RequestCancelTimer(timerID TimerID) {

func validateVersion(changeID string, version, minSupported, maxSupported Version) {
if version < minSupported {
panicIllegalState(fmt.Sprintf("Workflow code removed support of version %v. "+
panicIllegalState(fmt.Sprintf("[TMPRL1100] Workflow code removed support of version %v. "+
"for \"%v\" changeID. The oldest supported version is %v",
version, changeID, minSupported))
}
if version > maxSupported {
panicIllegalState(fmt.Sprintf("Workflow code is too old to support version %v "+
panicIllegalState(fmt.Sprintf("[TMPRL1100] Workflow code is too old to support version %v "+
"for \"%v\" changeID. The maximum supported version is %v",
version, changeID, maxSupported))
}
Expand Down Expand Up @@ -864,7 +864,7 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
for k := range wc.sideEffectResult {
keys = append(keys, k)
}
panicIllegalState(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
panicIllegalState(fmt.Sprintf("[TMPRL1100] No cached result found for side effectID=%v. KnownSideEffects=%v",
sideEffectID, keys))
}

Expand Down Expand Up @@ -976,7 +976,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa

if wc.isReplay {
// This should not happen
panicIllegalState(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
panicIllegalState(fmt.Sprintf("[TMPRL1100] Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
}

return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()))
Expand Down Expand Up @@ -1566,7 +1566,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details
if la, ok := weh.pendingLaTasks[lamd.ActivityID]; ok {
if len(lamd.ActivityType) > 0 && lamd.ActivityType != la.params.ActivityType {
// history marker mismatch to the current code.
panicMsg := fmt.Sprintf("code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
panicMsg := fmt.Sprintf("[TMPRL1100] code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
panicIllegalState(panicMsg)
}
weh.commandsHelper.recordLocalActivityMarker(lamd.ActivityID, details, failure)
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,15 +1453,15 @@ matchLoop:
}

if d == nil {
return historyMismatchErrorf("nondeterministic workflow: missing replay command for %s", util.HistoryEventToString(e))
return historyMismatchErrorf("[TMPRL1100] nondeterministic workflow: missing replay command for %s", util.HistoryEventToString(e))
}

if e == nil {
return historyMismatchErrorf("nondeterministic workflow: extra replay command for %s", util.CommandToString(d))
return historyMismatchErrorf("[TMPRL1100] nondeterministic workflow: extra replay command for %s", util.CommandToString(d))
}

if !isCommandMatchEvent(d, e, msgs) {
return historyMismatchErrorf("nondeterministic workflow: history event is %s, replay command is %s",
return historyMismatchErrorf("[TMPRL1100] nondeterministic workflow: history event is %s, replay command is %s",
util.HistoryEventToString(e), util.CommandToString(d))
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (up *updateProtocol) requireState(action string, valid ...updateState) {
return
}
}
panicIllegalState(fmt.Sprintf("invalid action %q in update protocol %+v", action, up))
panicIllegalState(fmt.Sprintf("[TMPRL1100] invalid action %q in update protocol %+v", action, up))
}

func (up *updateProtocol) HandleMessage(msg *protocolpb.Message) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ var _ WaitGroup = (*waitGroupImpl)(nil)
var _ dispatcher = (*dispatcherImpl)(nil)

// 1MB buffer to fit combined stack trace of all active goroutines
var stackBuf [1024*1024]byte
var stackBuf [1024 * 1024]byte

var (
errCoroStackNotFound = errors.New("coroutine stack not found")
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (s *coroutineState) call(timeout time.Duration) {
if err != nil {
st = fmt.Sprintf("<%s>", err)
}
msg := fmt.Sprintf("Potential deadlock detected: "+
msg := fmt.Sprintf("[TMPRL1101] Potential deadlock detected: "+
"workflow goroutine %q didn't yield for over a second", s.name)
s.closed.Store(true)
s.panicError = newWorkflowPanicError(msg, st)
Expand Down
Loading