Skip to content

Commit

Permalink
Add mutable state api to update activity/timer task status (#6868)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Add new api for updating timer task status
## Why?
<!-- Tell your future self why have you made these changes -->
We need to distinguish the user data update and timer task status update
because we don't need to update state transition when the change is task
status update change.
## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
unit test
## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
n/a
## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
n/a
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
no
  • Loading branch information
xwduan authored Nov 23, 2024
1 parent 9e68d9e commit 30e3e55
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 71 deletions.
7 changes: 2 additions & 5 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,8 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) {
err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) error {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
return nil
})
if err != nil {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(
ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return err
}
updateMutableState = true
Expand Down
7 changes: 1 addition & 6 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -246,11 +245,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask.GetVisibilityTime(), heartbeatTimeoutVis) {
err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) error {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
return nil
})
if err != nil {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return nil, err
}
updateMutableState = true
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,10 @@ type (
baseRunLowestCommonAncestorEventVersion int64,
)
UpdateActivity(int64, ActivityUpdater) error
UpdateActivityTimerHeartbeat(int64, time.Time)
UpdateActivityTaskStatusWithTimerHeartbeat(scheduleEventId int64, timerTaskStatus int32, heartbeatTimeoutVisibility *time.Time) error
UpdateActivityProgress(ai *persistencespb.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
UpdateUserTimer(*persistencespb.TimerInfo) error
UpdateUserTimerTaskStatus(timerId string, status int64) error
UpdateCurrentVersion(version int64, forceUpdate bool) error
UpdateWorkflowStateStatus(state enumsspb.WorkflowExecutionState, status enumspb.WorkflowExecutionStatus) error
UpdateBuildIdAssignment(buildId string) error
Expand Down
100 changes: 76 additions & 24 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ type (
executionStateUpdated bool
workflowTaskUpdated bool
updateInfoUpdated map[string]struct{}
// following xxxUserDataUpdated fields are for tracking if activity/timer user data updated.
// This help to determine if we need to update transition history: For
// user data change, we need to update transition history. No update for
// non-user data change
activityInfosUserDataUpdated map[int64]struct{}
timerInfosUserDataUpdated map[string]struct{}

InsertTasks map[tasks.Category][]tasks.Task

Expand Down Expand Up @@ -281,21 +287,23 @@ func NewMutableState(
pendingSignalRequestedIDs: make(map[string]struct{}),
deleteSignalRequestedIDs: make(map[string]struct{}),

approximateSize: 0,
totalTombstones: 0,
currentVersion: namespaceEntry.FailoverVersion(),
bufferEventsInDB: nil,
stateInDB: enumsspb.WORKFLOW_EXECUTION_STATE_VOID,
nextEventIDInDB: common.FirstEventID,
dbRecordVersion: 1,
namespaceEntry: namespaceEntry,
appliedEvents: make(map[string]struct{}),
InsertTasks: make(map[tasks.Category][]tasks.Task),
transitionHistoryEnabled: shard.GetConfig().EnableTransitionHistory(),
visibilityUpdated: false,
executionStateUpdated: false,
workflowTaskUpdated: false,
updateInfoUpdated: make(map[string]struct{}),
approximateSize: 0,
totalTombstones: 0,
currentVersion: namespaceEntry.FailoverVersion(),
bufferEventsInDB: nil,
stateInDB: enumsspb.WORKFLOW_EXECUTION_STATE_VOID,
nextEventIDInDB: common.FirstEventID,
dbRecordVersion: 1,
namespaceEntry: namespaceEntry,
appliedEvents: make(map[string]struct{}),
InsertTasks: make(map[tasks.Category][]tasks.Task),
transitionHistoryEnabled: shard.GetConfig().EnableTransitionHistory(),
visibilityUpdated: false,
executionStateUpdated: false,
workflowTaskUpdated: false,
updateInfoUpdated: make(map[string]struct{}),
timerInfosUserDataUpdated: make(map[string]struct{}),
activityInfosUserDataUpdated: make(map[int64]struct{}),

QueryRegistry: NewQueryRegistry(),

Expand Down Expand Up @@ -1598,6 +1606,7 @@ func (ms *MutableStateImpl) UpdateActivityProgress(
now := ms.timeSource.Now()
ai.LastHeartbeatUpdateTime = timestamppb.New(now)
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size()
ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{}
}
Expand Down Expand Up @@ -1645,6 +1654,7 @@ func (ms *MutableStateImpl) UpdateActivityInfo(
ai.Paused = incomingActivityInfo.GetPaused()

ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size()

err := ms.applyActivityBuildIdRedirect(ai, incomingActivityInfo.GetLastStartedBuildId(), incomingActivityInfo.GetLastStartedRedirectCounter())
Expand All @@ -1659,9 +1669,24 @@ func (ms *MutableStateImpl) UpdateActivityInfo(
return err
}

// UpdateActivityWithTimerHeartbeat updates an activity
func (ms *MutableStateImpl) UpdateActivityTimerHeartbeat(scheduledEventId int64, timerTimeoutVisibility time.Time) {
ms.pendingActivityTimerHeartbeats[scheduledEventId] = timerTimeoutVisibility
// UpdateActivityTaskStatusWithTimerHeartbeat updates an activity's timer task status or/and timer heartbeat
func (ms *MutableStateImpl) UpdateActivityTaskStatusWithTimerHeartbeat(scheduleEventID int64, timerTaskStatus int32, heartbeatTimeoutVisibility *time.Time) error {
ai, ok := ms.pendingActivityInfoIDs[scheduleEventID]
if !ok {
ms.logError(
fmt.Sprintf("unable to find activity event ID: %v in mutable state", scheduleEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingActivityInfo
}

ai.TimerTaskStatus = timerTaskStatus
ms.updateActivityInfos[ai.ScheduledEventId] = ai

if heartbeatTimeoutVisibility != nil {
ms.pendingActivityTimerHeartbeats[scheduleEventID] = *heartbeatTimeoutVisibility
}
return nil
}

// DeleteActivity deletes details about an activity.
Expand Down Expand Up @@ -1693,6 +1718,7 @@ func (ms *MutableStateImpl) DeleteActivity(
}

delete(ms.updateActivityInfos, scheduledEventID)
delete(ms.activityInfosUserDataUpdated, scheduledEventID)
delete(ms.syncActivityTasks, scheduledEventID)
ms.deleteActivityInfos[scheduledEventID] = struct{}{}
return nil
Expand Down Expand Up @@ -1740,6 +1766,21 @@ func (ms *MutableStateImpl) UpdateUserTimer(

ms.pendingTimerInfoIDs[ti.TimerId] = ti
ms.updateTimerInfos[ti.TimerId] = ti
ms.timerInfosUserDataUpdated[ti.TimerId] = struct{}{}
return nil
}

func (ms *MutableStateImpl) UpdateUserTimerTaskStatus(timerID string, status int64) error {
timerInfo, ok := ms.pendingTimerInfoIDs[timerID]
if !ok {
ms.logError(
fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingTimerInfo
}
timerInfo.TaskStatus = status
ms.updateTimerInfos[timerID] = timerInfo
return nil
}

Expand Down Expand Up @@ -1771,6 +1812,7 @@ func (ms *MutableStateImpl) DeleteUserTimer(
}

delete(ms.updateTimerInfos, timerID)
delete(ms.timerInfosUserDataUpdated, timerID)
ms.deleteTimerInfos[timerID] = struct{}{}
return nil
}
Expand Down Expand Up @@ -2921,6 +2963,7 @@ func (ms *MutableStateImpl) addPendingActivityInfo(ai *persistencespb.ActivityIn
ms.pendingActivityInfoIDs[ai.ScheduledEventId] = ai
ms.pendingActivityIDToEventID[ai.ActivityId] = ai.ScheduledEventId
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size() + int64SizeBytes
ms.executionInfo.ActivityCount++
}
Expand Down Expand Up @@ -3057,6 +3100,7 @@ func (ms *MutableStateImpl) ApplyActivityTaskStartedEvent(
ai.RequestId = attributes.GetRequestId()
ai.StartedTime = event.GetEventTime()
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size()

err := ms.applyActivityBuildIdRedirect(ai, worker_versioning.BuildIdIfUsingVersioning(attributes.GetWorkerVersion()), attributes.GetBuildIdRedirectCounter())
Expand Down Expand Up @@ -3299,6 +3343,7 @@ func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent(

ai.CancelRequestId = event.GetEventId()
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size()
return nil
}
Expand Down Expand Up @@ -3945,6 +3990,7 @@ func (ms *MutableStateImpl) ApplyTimerStartedEvent(
ms.pendingTimerInfoIDs[ti.TimerId] = ti
ms.pendingTimerEventIDToID[ti.StartedEventId] = ti.TimerId
ms.updateTimerInfos[ti.TimerId] = ti
ms.timerInfosUserDataUpdated[ti.TimerId] = struct{}{}
ms.approximateSize += ti.Size() + len(ti.TimerId)
ms.executionInfo.UserTimerCount++

Expand Down Expand Up @@ -4982,6 +5028,7 @@ func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater Activ
ms.approximateSize += ai.Size() - originalSize
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{}
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}

return nil
}
Expand Down Expand Up @@ -5172,9 +5219,9 @@ func (ms *MutableStateImpl) isStateDirty() bool {
// TODO: we need to track more workflow state changes
// e.g. changes to executionInfo.CancelRequested
return ms.hBuilder.IsDirty() ||
len(ms.updateActivityInfos) > 0 ||
len(ms.activityInfosUserDataUpdated) > 0 ||
len(ms.deleteActivityInfos) > 0 ||
len(ms.updateTimerInfos) > 0 ||
len(ms.timerInfosUserDataUpdated) > 0 ||
len(ms.deleteTimerInfos) > 0 ||
len(ms.updateChildExecutionInfos) > 0 ||
len(ms.deleteChildExecutionInfos) > 0 ||
Expand Down Expand Up @@ -5527,11 +5574,11 @@ func (ms *MutableStateImpl) closeTransactionTrackLastUpdateVersionedTransition(

transitionHistory := ms.executionInfo.TransitionHistory
currentVersionedTransition := transitionHistory[len(transitionHistory)-1]
for _, activityInfo := range ms.updateActivityInfos {
activityInfo.LastUpdateVersionedTransition = currentVersionedTransition
for activityId := range ms.activityInfosUserDataUpdated {
ms.updateActivityInfos[activityId].LastUpdateVersionedTransition = currentVersionedTransition
}
for _, timerInfo := range ms.updateTimerInfos {
timerInfo.LastUpdateVersionedTransition = currentVersionedTransition
for timerId := range ms.timerInfosUserDataUpdated {
ms.updateTimerInfos[timerId].LastUpdateVersionedTransition = currentVersionedTransition
}
for _, childInfo := range ms.updateChildExecutionInfos {
childInfo.LastUpdateVersionedTransition = currentVersionedTransition
Expand Down Expand Up @@ -5865,6 +5912,8 @@ func (ms *MutableStateImpl) cleanupTransaction() error {
ms.executionStateUpdated = false
ms.workflowTaskUpdated = false
ms.updateInfoUpdated = make(map[string]struct{})
ms.timerInfosUserDataUpdated = make(map[string]struct{})
ms.activityInfosUserDataUpdated = make(map[int64]struct{})

ms.stateInDB = ms.executionState.State
ms.nextEventIDInDB = ms.GetNextEventID()
Expand Down Expand Up @@ -6055,6 +6104,7 @@ func (ms *MutableStateImpl) updatePendingEventIDs(
if activityInfo, ok := ms.GetActivityInfo(scheduledEventID); ok {
activityInfo.StartedEventId = startedEventID
ms.updateActivityInfos[activityInfo.ScheduledEventId] = activityInfo
ms.activityInfosUserDataUpdated[activityInfo.ScheduledEventId] = struct{}{}
continue
}
if childInfo, ok := ms.GetChildExecutionInfo(scheduledEventID); ok {
Expand Down Expand Up @@ -6667,6 +6717,7 @@ func (ms *MutableStateImpl) applyUpdatesToSubStateMachines(
}
}, func(ai *persistencespb.ActivityInfo) {
ms.pendingActivityIDToEventID[ai.ActivityId] = ai.ScheduledEventId
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
})
if err != nil {
return err
Expand All @@ -6676,6 +6727,7 @@ func (ms *MutableStateImpl) applyUpdatesToSubStateMachines(
incoming.TaskStatus = TimerTaskStatusNone
}, func(ti *persistencespb.TimerInfo) {
ms.pendingTimerEventIDToID[ti.StartedEventId] = ti.TimerId
ms.timerInfosUserDataUpdated[ti.TimerId] = struct{}{}
})
if err != nil {
return err
Expand Down
39 changes: 39 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3818,7 +3818,13 @@ func (s *mutableStateSuite) TestApplySnapshot() {

// set updateXXX so LastUpdateVersionedTransition will be updated
targetMS.updateActivityInfos = targetMS.pendingActivityInfoIDs
for key := range targetMS.updateActivityInfos {
targetMS.activityInfosUserDataUpdated[key] = struct{}{}
}
targetMS.updateTimerInfos = targetMS.pendingTimerInfoIDs
for key := range targetMS.updateTimerInfos {
targetMS.timerInfosUserDataUpdated[key] = struct{}{}
}
targetMS.updateChildExecutionInfos = targetMS.pendingChildExecutionInfoIDs
targetMS.updateRequestCancelInfos = targetMS.pendingRequestCancelInfoIDs
targetMS.updateSignalInfos = targetMS.pendingSignalInfoIDs
Expand Down Expand Up @@ -3884,7 +3890,13 @@ func (s *mutableStateSuite) TestApplyMutation() {

// set updateXXX so LastUpdateVersionedTransition will be updated
targetMS.updateActivityInfos = targetMS.pendingActivityInfoIDs
for key := range targetMS.updateActivityInfos {
targetMS.activityInfosUserDataUpdated[key] = struct{}{}
}
targetMS.updateTimerInfos = targetMS.pendingTimerInfoIDs
for key := range targetMS.updateTimerInfos {
targetMS.timerInfosUserDataUpdated[key] = struct{}{}
}
targetMS.updateChildExecutionInfos = targetMS.pendingChildExecutionInfoIDs
targetMS.updateRequestCancelInfos = targetMS.pendingRequestCancelInfoIDs
targetMS.updateSignalInfos = targetMS.pendingSignalInfoIDs
Expand Down Expand Up @@ -4019,3 +4031,30 @@ func (s *mutableStateSuite) TestRefreshTask_SameCluster_SameAttempt() {
)
s.False(shouldReset)
}

func (s *mutableStateSuite) TestUpdateActivityTaskStatusWithTimerHeartbeat() {
dbState := s.buildWorkflowMutableState()
scheduleEventId := int64(781)
dbState.ActivityInfos[scheduleEventId] = &persistencespb.ActivityInfo{
Version: 5,
ScheduledEventId: int64(90),
ScheduledTime: timestamppb.New(time.Now().UTC()),
StartedEventId: common.EmptyEventID,
StartedTime: timestamppb.New(time.Now().UTC()),
ActivityId: "activityID_5",
TimerTaskStatus: 0,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
ScheduleToCloseTimeout: timestamp.DurationFromSeconds(200),
StartToCloseTimeout: timestamp.DurationFromSeconds(300),
HeartbeatTimeout: timestamp.DurationFromSeconds(50),
}
mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)
originalTime := time.Now().UTC().Add(time.Second * 60)
mutableState.pendingActivityTimerHeartbeats[scheduleEventId] = originalTime
status := int32(1)
err = mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(scheduleEventId, status, nil)
s.NoError(err)
s.Equal(status, dbState.ActivityInfos[scheduleEventId].TimerTaskStatus)
s.Equal(originalTime, mutableState.pendingActivityTimerHeartbeats[scheduleEventId])
}
28 changes: 22 additions & 6 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 30e3e55

Please sign in to comment.