From 7feffc261646f9aeac4edb0ee8f05d712ac844c7 Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Fri, 22 Nov 2024 16:08:48 -0600 Subject: [PATCH 1/9] IWF-163: Add retry policy to QueryWorkflow calls --- cmd/server/iwf/iwf.go | 14 ++++++-- config/config.go | 22 ++++++++++++ config/config_template.yaml | 3 ++ config/development.yaml | 3 ++ config/development_cadence.yaml | 3 ++ integ/config.go | 4 +++ integ/timer_test.go | 6 ---- integ/util.go | 22 ++++++++++-- service/client/cadence/client.go | 56 +++++++++++++++++++++++-------- service/client/temporal/client.go | 53 ++++++++++++++++++++++------- 10 files changed, 149 insertions(+), 37 deletions(-) diff --git a/cmd/server/iwf/iwf.go b/cmd/server/iwf/iwf.go index 7a865997..e9f9f6be 100644 --- a/cmd/server/iwf/iwf.go +++ b/cmd/server/iwf/iwf.go @@ -124,7 +124,11 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Temporal because of error %v", err) } - unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false) + retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: config.GetMaximumAttemptsWithDefault(), + } + unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, retryPolicy) for _, svcName := range services { go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger) @@ -146,7 +150,13 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Cadence because of error %v", err) } - unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) + + retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: config.GetMaximumAttemptsWithDefault(), + } + + unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy) for _, svcName := range services { go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger) diff --git a/config/config.go b/config/config.go index 6bdfacea..d83e8081 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,14 @@ type ( OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"` // WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"` + QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"` + } + + QueryWorkflowFailedRetryPolicy struct { + // defaults to 1 + InitialIntervalSeconds int `yaml:"initialIntervalSeconds"` + // defaults to 5 + MaximumAttempts int `yaml:"maximumAttempts"` } WaitForStateCompletionMigration struct { @@ -147,3 +155,17 @@ func (c Config) GetWaitForOnWithDefault() string { } return "old" } + +func (c Config) GetInitialIntervalSecondsWithDefault() int { + if c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds != 0 { + return c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds + } + return 1 +} + +func (c Config) GetMaximumAttemptsWithDefault() int { + if c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts != 0 { + return c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts + } + return 5 +} diff --git a/config/config_template.yaml b/config/config_template.yaml index ff8d2a3e..a6ba4a22 100644 --- a/config/config_template.yaml +++ b/config/config_template.yaml @@ -3,6 +3,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: defaultWorkflowConfig: continueAsNewThreshold: 100 diff --git a/config/development.yaml b/config/development.yaml index 3dbac3d2..a3b0817f 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -7,6 +7,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: temporal: hostPort: localhost:7233 diff --git a/config/development_cadence.yaml b/config/development_cadence.yaml index 977a9a10..f7858bec 100644 --- a/config/development_cadence.yaml +++ b/config/development_cadence.yaml @@ -3,6 +3,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: # interpreterActivityConfig: # disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085 diff --git a/integ/config.go b/integ/config.go index 91258333..e9afbba8 100644 --- a/integ/config.go +++ b/integ/config.go @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config { SignalWithStartOn: "old", WaitForOn: "old", }, + QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 1, + MaximumAttempts: 5, + }, }, Interpreter: config.Interpreter{ VerboseDebug: false, diff --git a/integ/timer_test.go b/integ/timer_test.go index e62dd843..3c7ab159 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -130,7 +130,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second * 1) timerInfos = service.GetCurrentTimerInfosQueryResponse{} err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { @@ -139,7 +138,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * timer2.Status = service.TimerSkipped assertTimerQueryResponseEqual(assertions, expectedTimerInfos, timerInfos) - time.Sleep(time.Second * 1) httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{ WorkflowId: wfId, WorkflowStateExecutionId: "S1-1", @@ -147,7 +145,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second * 1) timerInfos = service.GetCurrentTimerInfosQueryResponse{} err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { @@ -183,9 +180,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - if config != nil { - time.Sleep(time.Second * 2) - } err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { log.Fatalf("Fail to invoke query %v", err) diff --git a/integ/util.go b/integ/util.go index 8affc720..7bd64bdc 100644 --- a/integ/util.go +++ b/integ/util.go @@ -109,8 +109,16 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U if err != nil { panic(err) } - uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption) - iwfService := api.NewService(createTestConfig(config), uclient, logger) + + testCfg := createTestConfig(config) + + retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(), + } + + uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, retryPolicy) + iwfService := api.NewService(testCfg, uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -144,7 +152,15 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U if err != nil { panic(err) } - uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) + + testCfg := createTestConfig(config) + + retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(), + } + + uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy) iwfService := api.NewService(createTestConfig(config), uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index 2b2ca693..559573ad 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -22,12 +22,18 @@ import ( "go.uber.org/cadence/encoded" ) +type QueryWorkflowFailedRetryPolicy struct { + InitialIntervalSeconds int + MaximumAttempts int +} + type cadenceClient struct { - domain string - cClient client.Client - closeFunc func() - serviceClient workflowserviceclient.Interface - converter encoded.DataConverter + domain string + cClient client.Client + closeFunc func() + serviceClient workflowserviceclient.Interface + converter encoded.DataConverter + queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy } func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool { @@ -52,6 +58,12 @@ func (t *cadenceClient) IsNotFoundError(err error) bool { return ok } +func (t *cadenceClient) isQueryFailedError(err error) bool { + var serviceError *shared.QueryFailedError + ok := errors.As(err, &serviceError) + return ok +} + func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool { return realcadence.IsTimeoutError(err) } @@ -83,14 +95,15 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa func NewCadenceClient( domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, - converter encoded.DataConverter, closeFunc func(), + converter encoded.DataConverter, closeFunc func(), retryPolicy QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { return &cadenceClient{ - domain: domain, - cClient: cClient, - closeFunc: closeFunc, - serviceClient: serviceClient, - converter: converter, + domain: domain, + cClient: cClient, + closeFunc: closeFunc, + serviceClient: serviceClient, + converter: converter, + queryWorkflowFailedRetryPolicy: retryPolicy, } } @@ -227,9 +240,24 @@ func (t *cadenceClient) ListWorkflow( func (t *cadenceClient) QueryWorkflow( ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}, ) error { - qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args) - if err != nil { - return err + var qres encoded.Value + var err error + + attempt := 1 + for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + qres, err = queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args) + if err != nil { + if t.isQueryFailedError(err) { + if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + return err + } else { + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue + } + } + return err + } } return qres.Get(valuePtr) } diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index 9559638b..c96737cf 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -20,23 +20,31 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" realtemporal "go.temporal.io/sdk/temporal" + "time" ) +type QueryWorkflowFailedRetryPolicy struct { + InitialIntervalSeconds int + MaximumAttempts int +} + type temporalClient struct { - tClient client.Client - namespace string - dataConverter converter.DataConverter - memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 + tClient client.Client + namespace string + dataConverter converter.DataConverter + memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 + queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy } func NewTemporalClient( - tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, + tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { return &temporalClient{ - tClient: tClient, - namespace: namespace, - dataConverter: dataConverter, - memoEncryption: memoEncryption, + tClient: tClient, + namespace: namespace, + dataConverter: dataConverter, + memoEncryption: memoEncryption, + queryWorkflowFailedRetryPolicy: retryPolicy, } } @@ -71,6 +79,12 @@ func (t *temporalClient) IsNotFoundError(err error) bool { return ok } +func (t *temporalClient) isQueryFailedError(err error) bool { + var serviceError *serviceerror.QueryFailed + ok := errors.As(err, &serviceError) + return ok +} + func (t *temporalClient) IsRequestTimeoutError(err error) bool { var deadlineExceeded *serviceerror.DeadlineExceeded ok := errors.As(err, &deadlineExceeded) @@ -257,9 +271,24 @@ func (t *temporalClient) ListWorkflow( func (t *temporalClient) QueryWorkflow( ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}, ) error { - qres, err := t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) - if err != nil { - return err + var qres converter.EncodedValue + var err error + + attempt := 1 + for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) + if err != nil { + if t.isQueryFailedError(err) { + if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + return err + } else { + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue + } + } + return err + } } return qres.Get(valuePtr) } From 33386a37d2b3acc64b5475fd15cb1306175c005b Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 13:01:38 -0600 Subject: [PATCH 2/9] IWF-163: Simplify logic --- service/client/temporal/client.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index c96737cf..ad5ea4fd 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -279,17 +279,16 @@ func (t *temporalClient) QueryWorkflow( qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) if err != nil { if t.isQueryFailedError(err) { - if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts { - return err - } else { - time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) - attempt++ - continue - } + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue } return err } } + if err != nil { + return err + } return qres.Get(valuePtr) } From fc4f2d67e7c5537b5840901cca7e08b7bb313ac2 Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 13:49:24 -0600 Subject: [PATCH 3/9] IWF-163: Refactor --- cmd/server/iwf/iwf.go | 13 ++----------- config/config.go | 14 -------------- integ/util.go | 14 ++------------ service/client/cadence/client.go | 19 +++++++++++++++++-- service/client/temporal/client.go | 19 +++++++++++++++++-- service/client/temporal/client_test.go | 11 +++++++++-- 6 files changed, 47 insertions(+), 43 deletions(-) diff --git a/cmd/server/iwf/iwf.go b/cmd/server/iwf/iwf.go index e9f9f6be..56a39ecd 100644 --- a/cmd/server/iwf/iwf.go +++ b/cmd/server/iwf/iwf.go @@ -124,11 +124,7 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Temporal because of error %v", err) } - retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{ - InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(), - MaximumAttempts: config.GetMaximumAttemptsWithDefault(), - } - unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, retryPolicy) + unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, &config.Api.QueryWorkflowFailedRetryPolicy) for _, svcName := range services { go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger) @@ -151,12 +147,7 @@ func start(c *cli.Context) { rawLog.Fatalf("Unable to connect to Cadence because of error %v", err) } - retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{ - InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(), - MaximumAttempts: config.GetMaximumAttemptsWithDefault(), - } - - unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy) + unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &config.Api.QueryWorkflowFailedRetryPolicy) for _, svcName := range services { go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger) diff --git a/config/config.go b/config/config.go index d83e8081..c334f85f 100644 --- a/config/config.go +++ b/config/config.go @@ -155,17 +155,3 @@ func (c Config) GetWaitForOnWithDefault() string { } return "old" } - -func (c Config) GetInitialIntervalSecondsWithDefault() int { - if c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds != 0 { - return c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds - } - return 1 -} - -func (c Config) GetMaximumAttemptsWithDefault() int { - if c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts != 0 { - return c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts - } - return 5 -} diff --git a/integ/util.go b/integ/util.go index 7bd64bdc..7c9113c3 100644 --- a/integ/util.go +++ b/integ/util.go @@ -112,12 +112,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U testCfg := createTestConfig(config) - retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{ - InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(), - MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(), - } - - uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, retryPolicy) + uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, &testCfg.Api.QueryWorkflowFailedRetryPolicy) iwfService := api.NewService(testCfg, uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, @@ -155,12 +150,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U testCfg := createTestConfig(config) - retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{ - InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(), - MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(), - } - - uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy) + uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy) iwfService := api.NewService(createTestConfig(config), uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index 559573ad..d8ecff1c 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/indeedeng/iwf/config" "time" "github.com/indeedeng/iwf/service" @@ -95,15 +96,29 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa func NewCadenceClient( domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, - converter encoded.DataConverter, closeFunc func(), retryPolicy QueryWorkflowFailedRetryPolicy, + converter encoded.DataConverter, closeFunc func(), retryPolicy *config.QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { + var rp QueryWorkflowFailedRetryPolicy + + if retryPolicy.InitialIntervalSeconds == 0 { + rp.InitialIntervalSeconds = 1 + } else { + rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds + } + + if retryPolicy.MaximumAttempts == 0 { + rp.MaximumAttempts = 5 + } else { + rp.MaximumAttempts = retryPolicy.MaximumAttempts + } + return &cadenceClient{ domain: domain, cClient: cClient, closeFunc: closeFunc, serviceClient: serviceClient, converter: converter, - queryWorkflowFailedRetryPolicy: retryPolicy, + queryWorkflowFailedRetryPolicy: rp, } } diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index ad5ea4fd..ad216941 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/google/uuid" + "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" uclient "github.com/indeedeng/iwf/service/client" @@ -37,14 +38,28 @@ type temporalClient struct { } func NewTemporalClient( - tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy QueryWorkflowFailedRetryPolicy, + tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy *config.QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { + var rp QueryWorkflowFailedRetryPolicy + + if retryPolicy.InitialIntervalSeconds == 0 { + rp.InitialIntervalSeconds = 1 + } else { + rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds + } + + if retryPolicy.MaximumAttempts == 0 { + rp.MaximumAttempts = 5 + } else { + rp.MaximumAttempts = retryPolicy.MaximumAttempts + } + return &temporalClient{ tClient: tClient, namespace: namespace, dataConverter: dataConverter, memoEncryption: memoEncryption, - queryWorkflowFailedRetryPolicy: retryPolicy, + queryWorkflowFailedRetryPolicy: rp, } } diff --git a/service/client/temporal/client_test.go b/service/client/temporal/client_test.go index b4247483..fd15a281 100644 --- a/service/client/temporal/client_test.go +++ b/service/client/temporal/client_test.go @@ -3,6 +3,7 @@ package temporal import ( "errors" "github.com/golang/mock/gomock" + "github.com/indeedeng/iwf/config" "github.com/stretchr/testify/assert" "go.temporal.io/api/serviceerror" "testing" @@ -13,7 +14,10 @@ func TestAlreadyStartedErrorForWorkflow(t *testing.T) { mockRealTemporalClient := NewMockClient(ctrl) mockDataConverter := NewMockDataConverter(ctrl) - client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false) + client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false, &config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 0, + MaximumAttempts: 0, + }) err := &serviceerror.WorkflowExecutionAlreadyStarted{} assert.Equal(t, true, client.IsWorkflowAlreadyStartedError(err)) @@ -24,7 +28,10 @@ func TestAlreadyStartedErrorForCronWorkflow(t *testing.T) { mockRealTemporalClient := NewMockClient(ctrl) mockDataConverter := NewMockDataConverter(ctrl) - client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false) + client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false, &config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 0, + MaximumAttempts: 0, + }) err := errors.New("schedule with this ID is already registered") From bbe6f74cb33074702bed67fb61a7e5367efe6fe0 Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 14:46:44 -0600 Subject: [PATCH 4/9] IWF-163: Fix double method call --- integ/util.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integ/util.go b/integ/util.go index 7c9113c3..01d4e997 100644 --- a/integ/util.go +++ b/integ/util.go @@ -125,7 +125,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U }() // start iwf interpreter worker - interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient) + interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient) if *disableStickyCache { interpreter.StartWithStickyCacheDisabledForTest() } else { @@ -151,7 +151,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U testCfg := createTestConfig(config) uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy) - iwfService := api.NewService(createTestConfig(config), uclient, logger) + iwfService := api.NewService(testCfg, uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -163,7 +163,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U }() // start iwf interpreter worker - interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient) + interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient) if *disableStickyCache { interpreter.StartWithStickyCacheDisabledForTest() } else { From d61aa8d794e0d78d3b77fa50d6f76a879ca6e81a Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 14:58:07 -0600 Subject: [PATCH 5/9] IWF-163: Fix issue --- service/client/cadence/client.go | 20 +++++++++++--------- service/client/temporal/client.go | 5 ++++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index d8ecff1c..b6b72d39 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -259,21 +259,23 @@ func (t *cadenceClient) QueryWorkflow( var err error attempt := 1 + // Only QueryFailed error causes retry; all other errors make the loop to finish immediately for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { - qres, err = queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args) - if err != nil { + qres, err = t.cClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) + if err == nil { + break + } else { if t.isQueryFailedError(err) { - if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts { - return err - } else { - time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) - attempt++ - continue - } + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue } return err } } + if err != nil { + return err + } return qres.Get(valuePtr) } diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index ad216941..ea7e7228 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -290,9 +290,12 @@ func (t *temporalClient) QueryWorkflow( var err error attempt := 1 + // Only QueryFailed error causes retry; all other errors make the loop to finish immediately for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) - if err != nil { + if err == nil { + break + } else { if t.isQueryFailedError(err) { time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) attempt++ From 3c958f2c5eb10e48911308e56a7fc187244dbd40 Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 15:06:12 -0600 Subject: [PATCH 6/9] IWF-163: Remove not needed sleep --- integ/create_test.go | 2 -- integ/signal_test.go | 3 --- 2 files changed, 5 deletions(-) diff --git a/integ/create_test.go b/integ/create_test.go index baf81c4a..cca03218 100644 --- a/integ/create_test.go +++ b/integ/create_test.go @@ -84,8 +84,6 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second * 2) - // workflow shouldn't executed any state var dump service.ContinueAsNewDumpResponse err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType) diff --git a/integ/signal_test.go b/integ/signal_test.go index c1230be5..d9cb48cf 100644 --- a/integ/signal_test.go +++ b/integ/signal_test.go @@ -110,7 +110,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config panicAtHttpError(err, httpResp) // test update config - time.Sleep(time.Second) var debugDump service.DebugDumpResponse err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType) if err != nil { @@ -134,7 +133,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second) err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType) if err != nil { panic(err) @@ -154,7 +152,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second) err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType) if err != nil { panic(err) From bc8cc1261be14bcc497391126225340d0e91efba Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 15:13:14 -0600 Subject: [PATCH 7/9] IWF-163: Rename helper method --- service/client/cadence/client.go | 21 ++++++++++++--------- service/client/temporal/client.go | 19 +++++++++++-------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index b6b72d39..9b0f38ec 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -98,6 +98,17 @@ func NewCadenceClient( domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, converter encoded.DataConverter, closeFunc func(), retryPolicy *config.QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { + return &cadenceClient{ + domain: domain, + cClient: cClient, + closeFunc: closeFunc, + serviceClient: serviceClient, + converter: converter, + queryWorkflowFailedRetryPolicy: queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), + } +} + +func queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *config.QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy { var rp QueryWorkflowFailedRetryPolicy if retryPolicy.InitialIntervalSeconds == 0 { @@ -111,15 +122,7 @@ func NewCadenceClient( } else { rp.MaximumAttempts = retryPolicy.MaximumAttempts } - - return &cadenceClient{ - domain: domain, - cClient: cClient, - closeFunc: closeFunc, - serviceClient: serviceClient, - converter: converter, - queryWorkflowFailedRetryPolicy: rp, - } + return rp } func (t *cadenceClient) Close() { diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index ea7e7228..8544393f 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -40,6 +40,16 @@ type temporalClient struct { func NewTemporalClient( tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy *config.QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { + return &temporalClient{ + tClient: tClient, + namespace: namespace, + dataConverter: dataConverter, + memoEncryption: memoEncryption, + queryWorkflowFailedRetryPolicy: queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), + } +} + +func queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *config.QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy { var rp QueryWorkflowFailedRetryPolicy if retryPolicy.InitialIntervalSeconds == 0 { @@ -53,14 +63,7 @@ func NewTemporalClient( } else { rp.MaximumAttempts = retryPolicy.MaximumAttempts } - - return &temporalClient{ - tClient: tClient, - namespace: namespace, - dataConverter: dataConverter, - memoEncryption: memoEncryption, - queryWorkflowFailedRetryPolicy: rp, - } + return rp } func (t *temporalClient) Close() { From 04cf8c8d9816f332b160b48961822bdd687588ea Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 15:40:37 -0600 Subject: [PATCH 8/9] IWF-163: Add previously removed sleeps --- integ/timer_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integ/timer_test.go b/integ/timer_test.go index 3c7ab159..35681e1e 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -130,6 +130,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) + time.Sleep(time.Second * 1) timerInfos = service.GetCurrentTimerInfosQueryResponse{} err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { @@ -138,6 +139,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * timer2.Status = service.TimerSkipped assertTimerQueryResponseEqual(assertions, expectedTimerInfos, timerInfos) + time.Sleep(time.Second * 1) httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{ WorkflowId: wfId, WorkflowStateExecutionId: "S1-1", @@ -145,6 +147,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) + time.Sleep(time.Second * 1) timerInfos = service.GetCurrentTimerInfosQueryResponse{} err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { From d5d71318c75cc1a1a760385d81370921f99b6691 Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Mon, 25 Nov 2024 16:22:38 -0600 Subject: [PATCH 9/9] IWF-163: Move helper method to config package --- config/config.go | 18 ++++++++++++++++++ service/client/cadence/client.go | 26 ++------------------------ service/client/temporal/client.go | 26 ++------------------------ 3 files changed, 22 insertions(+), 48 deletions(-) diff --git a/config/config.go b/config/config.go index c334f85f..c0d432ed 100644 --- a/config/config.go +++ b/config/config.go @@ -155,3 +155,21 @@ func (c Config) GetWaitForOnWithDefault() string { } return "old" } + +func QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy { + var rp QueryWorkflowFailedRetryPolicy + + if retryPolicy != nil && retryPolicy.InitialIntervalSeconds != 0 { + rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds + } else { + rp.InitialIntervalSeconds = 1 + } + + if retryPolicy != nil && retryPolicy.MaximumAttempts != 0 { + rp.MaximumAttempts = retryPolicy.MaximumAttempts + } else { + rp.MaximumAttempts = 5 + } + + return rp +} diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index 9b0f38ec..59effbfb 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -23,18 +23,13 @@ import ( "go.uber.org/cadence/encoded" ) -type QueryWorkflowFailedRetryPolicy struct { - InitialIntervalSeconds int - MaximumAttempts int -} - type cadenceClient struct { domain string cClient client.Client closeFunc func() serviceClient workflowserviceclient.Interface converter encoded.DataConverter - queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy + queryWorkflowFailedRetryPolicy config.QueryWorkflowFailedRetryPolicy } func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool { @@ -104,25 +99,8 @@ func NewCadenceClient( closeFunc: closeFunc, serviceClient: serviceClient, converter: converter, - queryWorkflowFailedRetryPolicy: queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), - } -} - -func queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *config.QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy { - var rp QueryWorkflowFailedRetryPolicy - - if retryPolicy.InitialIntervalSeconds == 0 { - rp.InitialIntervalSeconds = 1 - } else { - rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds - } - - if retryPolicy.MaximumAttempts == 0 { - rp.MaximumAttempts = 5 - } else { - rp.MaximumAttempts = retryPolicy.MaximumAttempts + queryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), } - return rp } func (t *cadenceClient) Close() { diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index 8544393f..5338b353 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -24,17 +24,12 @@ import ( "time" ) -type QueryWorkflowFailedRetryPolicy struct { - InitialIntervalSeconds int - MaximumAttempts int -} - type temporalClient struct { tClient client.Client namespace string dataConverter converter.DataConverter memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 - queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy + queryWorkflowFailedRetryPolicy config.QueryWorkflowFailedRetryPolicy } func NewTemporalClient( @@ -45,25 +40,8 @@ func NewTemporalClient( namespace: namespace, dataConverter: dataConverter, memoEncryption: memoEncryption, - queryWorkflowFailedRetryPolicy: queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), - } -} - -func queryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *config.QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy { - var rp QueryWorkflowFailedRetryPolicy - - if retryPolicy.InitialIntervalSeconds == 0 { - rp.InitialIntervalSeconds = 1 - } else { - rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds - } - - if retryPolicy.MaximumAttempts == 0 { - rp.MaximumAttempts = 5 - } else { - rp.MaximumAttempts = retryPolicy.MaximumAttempts + queryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy), } - return rp } func (t *temporalClient) Close() {