Skip to content

Commit

Permalink
Enable retry on mutable state checksum verification failure (#5691)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Feb 26, 2024
1 parent 581eb07 commit 169b8f1
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 23 deletions.
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,13 @@ const (
// Allowed filters: DomainID
EnableTaskVal

// EnableRetryForChecksumFailure enables retry if mutable state checksum verification fails
// KeyName: history.enableMutableStateChecksumFailureRetry
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableRetryForChecksumFailure

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4230,6 +4237,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "WorkflowIDCacheInternalEnabled is the key to enable/disable caching of workflowID specific information for internal requests",
DefaultValue: false,
},
EnableRetryForChecksumFailure: DynamicBool{
KeyName: "history.enableMutableStateChecksumFailureRetry",
Filters: []Filter{DomainName},
Description: "EnableRetryForChecksumFailure enables retry if mutable state checksum verification fails",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ type Config struct {
MutableStateChecksumGenProbability dynamicconfig.IntPropertyFnWithDomainFilter
MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithDomainFilter
MutableStateChecksumInvalidateBefore dynamicconfig.FloatPropertyFn
EnableRetryForChecksumFailure dynamicconfig.BoolPropertyFnWithDomainFilter

// History check for corruptions
EnableHistoryCorruptionCheck dynamicconfig.BoolPropertyFnWithDomainFilter
Expand Down Expand Up @@ -566,6 +567,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
MutableStateChecksumGenProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumGenProbability),
MutableStateChecksumVerifyProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumVerifyProbability),
MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore),
EnableRetryForChecksumFailure: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableRetryForChecksumFailure),

EnableHistoryCorruptionCheck: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableHistoryCorruptionCheck),

Expand Down
45 changes: 30 additions & 15 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -45,6 +46,7 @@ import (

const (
defaultRemoteCallTimeout = 30 * time.Second
checksumErrorRetryCount = 3
)

type conflictError struct {
Expand Down Expand Up @@ -252,6 +254,10 @@ func (c *contextImpl) LoadExecutionStats(
return c.stats, nil
}

func isChecksumError(err error) bool {
return strings.Contains(err.Error(), "checksum mismatch error")
}

func (c *contextImpl) LoadWorkflowExecutionWithTaskVersion(
ctx context.Context,
incomingVersion int64,
Expand All @@ -263,22 +269,31 @@ func (c *contextImpl) LoadWorkflowExecutionWithTaskVersion(
}

if c.mutableState == nil {
response, err := c.getWorkflowExecutionWithRetry(ctx, &persistence.GetWorkflowExecutionRequest{
DomainID: c.domainID,
Execution: c.workflowExecution,
DomainName: domainEntry.GetInfo().Name,
})
if err != nil {
return nil, err
}

c.mutableState = NewMutableStateBuilder(
c.shard,
c.logger,
domainEntry,
)
var response *persistence.GetWorkflowExecutionResponse
for i := 0; i < checksumErrorRetryCount; i++ {
response, err = c.getWorkflowExecutionWithRetry(ctx, &persistence.GetWorkflowExecutionRequest{
DomainID: c.domainID,
Execution: c.workflowExecution,
DomainName: domainEntry.GetInfo().Name,
})
if err != nil {
return nil, err
}

c.mutableState.Load(response.State)
c.mutableState = NewMutableStateBuilder(
c.shard,
c.logger,
domainEntry,
)

err = c.mutableState.Load(response.State)
if err == nil {
break
} else if !isChecksumError(err) {
c.logger.Error("failed to load mutable state", tag.Error(err))
break
}
}

c.stats = response.State.ExecutionStats
c.updateCondition = response.State.ExecutionInfo.NextEventID
Expand Down
2 changes: 1 addition & 1 deletion service/history/execution/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type (
IsWorkflowCompleted() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID)
Load(*persistence.WorkflowMutableState)
Load(*persistence.WorkflowMutableState) error
ReplicateActivityInfo(*types.SyncActivityRequest, bool) error
ReplicateActivityTaskCancelRequestedEvent(*types.HistoryEvent) error
ReplicateActivityTaskCanceledEvent(*types.HistoryEvent) error
Expand Down
25 changes: 23 additions & 2 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pborman/uuid"
"golang.org/x/exp/maps"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -317,7 +318,7 @@ func (e *mutableStateBuilder) CopyToPersistence() *persistence.WorkflowMutableSt

func (e *mutableStateBuilder) Load(
state *persistence.WorkflowMutableState,
) {
) error {

e.pendingActivityInfoIDs = state.ActivityInfos
for _, activityInfo := range state.ActivityInfos {
Expand Down Expand Up @@ -357,10 +358,23 @@ func (e *mutableStateBuilder) Load(
// feature is tested and/or we have mechanisms in place to deal
// with these types of errors
e.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.MutableStateChecksumMismatch)
e.logError("mutable state checksum mismatch", tag.Error(err))
e.logError("mutable state checksum mismatch",
tag.WorkflowNextEventID(e.executionInfo.NextEventID),
tag.WorkflowScheduleID(e.executionInfo.DecisionScheduleID),
tag.WorkflowStartedID(e.executionInfo.DecisionStartedID),
tag.Dynamic("timerIDs", maps.Keys(e.pendingTimerInfoIDs)),
tag.Dynamic("activityIDs", maps.Keys(e.pendingActivityInfoIDs)),
tag.Dynamic("childIDs", maps.Keys(e.pendingChildExecutionInfoIDs)),
tag.Dynamic("signalIDs", maps.Keys(e.pendingSignalInfoIDs)),
tag.Dynamic("cancelIDs", maps.Keys(e.pendingRequestCancelInfoIDs)),
tag.Error(err))
if e.enableChecksumFailureRetry() {
return err
}
}
}
}
return nil
}

func (e *mutableStateBuilder) fillForBackwardsCompatibility() {
Expand Down Expand Up @@ -4757,6 +4771,13 @@ func (e *mutableStateBuilder) shouldVerifyChecksum() bool {
return rand.Intn(100) < e.config.MutableStateChecksumVerifyProbability(e.domainEntry.GetInfo().Name)
}

func (e *mutableStateBuilder) enableChecksumFailureRetry() bool {
if e.domainEntry == nil {
return false
}
return e.config.EnableRetryForChecksumFailure(e.domainEntry.GetInfo().Name)
}

func (e *mutableStateBuilder) shouldInvalidateChecksum() bool {
invalidateBeforeEpochSecs := int64(e.config.MutableStateChecksumInvalidateBefore())
if invalidateBeforeEpochSecs > 0 {
Expand Down
10 changes: 7 additions & 3 deletions service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (s *mutableStateSuite) SetupTest() {
// set the checksum probabilities to 100% for exercising during test
s.mockShard.GetConfig().MutableStateChecksumGenProbability = func(domain string) int { return 100 }
s.mockShard.GetConfig().MutableStateChecksumVerifyProbability = func(domain string) int { return 100 }
s.mockShard.GetConfig().EnableRetryForChecksumFailure = func(domain string) bool { return true }

s.mockEventsCache = s.mockShard.GetEventsCache().(*events.MockCache)

Expand Down Expand Up @@ -412,7 +413,8 @@ func (s *mutableStateSuite) TestChecksum() {

// verify checksum is verified on Load
dbState.Checksum = csum
s.msBuilder.Load(dbState)
err = s.msBuilder.Load(dbState)
s.NoError(err)
s.Equal(loadErrors, loadErrorsFunc())

// generate checksum again and verify its the same
Expand All @@ -423,7 +425,8 @@ func (s *mutableStateSuite) TestChecksum() {

// modify checksum and verify Load fails
dbState.Checksum.Value[0]++
s.msBuilder.Load(dbState)
err = s.msBuilder.Load(dbState)
s.Error(err)
s.Equal(loadErrors+1, loadErrorsFunc())
s.EqualValues(dbState.Checksum, s.msBuilder.checksum)

Expand All @@ -432,7 +435,8 @@ func (s *mutableStateSuite) TestChecksum() {
s.mockShard.GetConfig().MutableStateChecksumInvalidateBefore = func(...dynamicconfig.FilterOption) float64 {
return float64((s.msBuilder.executionInfo.LastUpdatedTimestamp.UnixNano() / int64(time.Second)) + 1)
}
s.msBuilder.Load(dbState)
err = s.msBuilder.Load(dbState)
s.NoError(err)
s.Equal(loadErrors, loadErrorsFunc())
s.EqualValues(checksum.Checksum{}, s.msBuilder.checksum)

Expand Down
6 changes: 4 additions & 2 deletions service/history/execution/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 169b8f1

Please sign in to comment.