diff --git a/cmd/server/iwf/iwf.go b/cmd/server/iwf/iwf.go index 7a865997..56a39ecd 100644 --- a/cmd/server/iwf/iwf.go +++ b/cmd/server/iwf/iwf.go @@ -124,7 +124,7 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Temporal because of error %v", err) } - unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false) + unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, &config.Api.QueryWorkflowFailedRetryPolicy) for _, svcName := range services { go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger) @@ -146,7 +146,8 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Cadence because of error %v", err) } - unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) + + unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &config.Api.QueryWorkflowFailedRetryPolicy) for _, svcName := range services { go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger) diff --git a/config/config.go b/config/config.go index 6bdfacea..c0d432ed 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,14 @@ type ( OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"` // WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"` + QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"` + } + + QueryWorkflowFailedRetryPolicy struct { + // defaults to 1 + InitialIntervalSeconds int `yaml:"initialIntervalSeconds"` + // defaults to 5 + MaximumAttempts int `yaml:"maximumAttempts"` } WaitForStateCompletionMigration struct { @@ -147,3 +155,21 @@ func (c Config) GetWaitForOnWithDefault() string { } return "old" } + +func QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy { + var rp QueryWorkflowFailedRetryPolicy + + if retryPolicy != nil && retryPolicy.InitialIntervalSeconds != 0 { + rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds + } else { + rp.InitialIntervalSeconds = 1 + } + + if retryPolicy != nil && retryPolicy.MaximumAttempts != 0 { + rp.MaximumAttempts = retryPolicy.MaximumAttempts + } else { + rp.MaximumAttempts = 5 + } + + return rp +} diff --git a/config/config_template.yaml b/config/config_template.yaml index ff8d2a3e..a6ba4a22 100644 --- a/config/config_template.yaml +++ b/config/config_template.yaml @@ -3,6 +3,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: defaultWorkflowConfig: continueAsNewThreshold: 100 diff --git a/config/development.yaml b/config/development.yaml index 3dbac3d2..a3b0817f 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -7,6 +7,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: temporal: hostPort: localhost:7233 diff --git a/config/development_cadence.yaml b/config/development_cadence.yaml index 977a9a10..f7858bec 100644 --- a/config/development_cadence.yaml +++ b/config/development_cadence.yaml @@ -3,6 +3,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: # interpreterActivityConfig: # disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085 diff --git a/integ/config.go b/integ/config.go index 91258333..e9afbba8 100644 --- a/integ/config.go +++ b/integ/config.go @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config { SignalWithStartOn: "old", WaitForOn: "old", }, + QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 1, + MaximumAttempts: 5, + }, }, Interpreter: config.Interpreter{ VerboseDebug: false, diff --git a/integ/create_test.go b/integ/create_test.go index baf81c4a..cca03218 100644 --- a/integ/create_test.go +++ b/integ/create_test.go @@ -84,8 +84,6 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second * 2) - // workflow shouldn't executed any state var dump service.ContinueAsNewDumpResponse err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType) diff --git a/integ/signal_test.go b/integ/signal_test.go index c1230be5..d9cb48cf 100644 --- a/integ/signal_test.go +++ b/integ/signal_test.go @@ -110,7 +110,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config panicAtHttpError(err, httpResp) // test update config - time.Sleep(time.Second) var debugDump service.DebugDumpResponse err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType) if err != nil { @@ -134,7 +133,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second) err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType) if err != nil { panic(err) @@ -154,7 +152,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second) err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType) if err != nil { panic(err) diff --git a/integ/timer_test.go b/integ/timer_test.go index e62dd843..35681e1e 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -183,9 +183,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - if config != nil { - time.Sleep(time.Second * 2) - } err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { log.Fatalf("Fail to invoke query %v", err) diff --git a/integ/util.go b/integ/util.go index 8affc720..01d4e997 100644 --- a/integ/util.go +++ b/integ/util.go @@ -109,8 +109,11 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U if err != nil { panic(err) } - uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption) - iwfService := api.NewService(createTestConfig(config), uclient, logger) + + testCfg := createTestConfig(config) + + uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, &testCfg.Api.QueryWorkflowFailedRetryPolicy) + iwfService := api.NewService(testCfg, uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -122,7 +125,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U }() // start iwf interpreter worker - interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient) + interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient) if *disableStickyCache { interpreter.StartWithStickyCacheDisabledForTest() } else { @@ -144,8 +147,11 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U if err != nil { panic(err) } - uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) - iwfService := api.NewService(createTestConfig(config), uclient, logger) + + testCfg := createTestConfig(config) + + uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy) + iwfService := api.NewService(testCfg, uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -157,7 +163,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U }() // start iwf interpreter worker - interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient) + interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient) if *disableStickyCache { interpreter.StartWithStickyCacheDisabledForTest() } else { diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index 2b2ca693..59effbfb 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/indeedeng/iwf/config" "time" "github.com/indeedeng/iwf/service" @@ -23,11 +24,12 @@ import ( ) type cadenceClient struct { - domain string - cClient client.Client - closeFunc func() - serviceClient workflowserviceclient.Interface - converter encoded.DataConverter + domain string + cClient client.Client + closeFunc func() + serviceClient workflowserviceclient.Interface + converter encoded.DataConverter + queryWorkflowFailedRetryPolicy config.QueryWorkflowFailedRetryPolicy } func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool { @@ -52,6 +54,12 @@ func (t *cadenceClient) IsNotFoundError(err error) bool { return ok } +func (t *cadenceClient) isQueryFailedError(err error) bool { + var serviceError *shared.QueryFailedError + ok := errors.As(err, &serviceError) + return ok +} + func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool { return realcadence.IsTimeoutError(err) } @@ -83,14 +91,15 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa func NewCadenceClient( domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, - converter encoded.DataConverter, closeFunc func(), + converter encoded.DataConverter, closeFunc func(), retryPolicy *config.QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { return &cadenceClient{ - domain: domain, - cClient: cClient, - closeFunc: closeFunc, - serviceClient: serviceClient, - converter: converter, + domain: domain, + cClient: cClient, + closeFunc: closeFunc, + serviceClient: serviceClient, + converter: converter, + queryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), } } @@ -227,7 +236,24 @@ func (t *cadenceClient) ListWorkflow( func (t *cadenceClient) QueryWorkflow( ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}, ) error { - qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args) + var qres encoded.Value + var err error + + attempt := 1 + // Only QueryFailed error causes retry; all other errors make the loop to finish immediately + for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + qres, err = t.cClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) + if err == nil { + break + } else { + if t.isQueryFailedError(err) { + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue + } + return err + } + } if err != nil { return err } diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index 9559638b..5338b353 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/google/uuid" + "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" uclient "github.com/indeedeng/iwf/service/client" @@ -20,23 +21,26 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" realtemporal "go.temporal.io/sdk/temporal" + "time" ) type temporalClient struct { - tClient client.Client - namespace string - dataConverter converter.DataConverter - memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 + tClient client.Client + namespace string + dataConverter converter.DataConverter + memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 + queryWorkflowFailedRetryPolicy config.QueryWorkflowFailedRetryPolicy } func NewTemporalClient( - tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, + tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy *config.QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { return &temporalClient{ - tClient: tClient, - namespace: namespace, - dataConverter: dataConverter, - memoEncryption: memoEncryption, + tClient: tClient, + namespace: namespace, + dataConverter: dataConverter, + memoEncryption: memoEncryption, + queryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), } } @@ -71,6 +75,12 @@ func (t *temporalClient) IsNotFoundError(err error) bool { return ok } +func (t *temporalClient) isQueryFailedError(err error) bool { + var serviceError *serviceerror.QueryFailed + ok := errors.As(err, &serviceError) + return ok +} + func (t *temporalClient) IsRequestTimeoutError(err error) bool { var deadlineExceeded *serviceerror.DeadlineExceeded ok := errors.As(err, &deadlineExceeded) @@ -257,7 +267,24 @@ func (t *temporalClient) ListWorkflow( func (t *temporalClient) QueryWorkflow( ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}, ) error { - qres, err := t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) + var qres converter.EncodedValue + var err error + + attempt := 1 + // Only QueryFailed error causes retry; all other errors make the loop to finish immediately + for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) + if err == nil { + break + } else { + if t.isQueryFailedError(err) { + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue + } + return err + } + } if err != nil { return err } diff --git a/service/client/temporal/client_test.go b/service/client/temporal/client_test.go index b4247483..fd15a281 100644 --- a/service/client/temporal/client_test.go +++ b/service/client/temporal/client_test.go @@ -3,6 +3,7 @@ package temporal import ( "errors" "github.com/golang/mock/gomock" + "github.com/indeedeng/iwf/config" "github.com/stretchr/testify/assert" "go.temporal.io/api/serviceerror" "testing" @@ -13,7 +14,10 @@ func TestAlreadyStartedErrorForWorkflow(t *testing.T) { mockRealTemporalClient := NewMockClient(ctrl) mockDataConverter := NewMockDataConverter(ctrl) - client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false) + client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false, &config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 0, + MaximumAttempts: 0, + }) err := &serviceerror.WorkflowExecutionAlreadyStarted{} assert.Equal(t, true, client.IsWorkflowAlreadyStartedError(err)) @@ -24,7 +28,10 @@ func TestAlreadyStartedErrorForCronWorkflow(t *testing.T) { mockRealTemporalClient := NewMockClient(ctrl) mockDataConverter := NewMockDataConverter(ctrl) - client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false) + client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false, &config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 0, + MaximumAttempts: 0, + }) err := errors.New("schedule with this ID is already registered")