diff --git a/flytepropeller/Makefile b/flytepropeller/Makefile index 758e13bc04..2c62af3b09 100644 --- a/flytepropeller/Makefile +++ b/flytepropeller/Makefile @@ -46,3 +46,7 @@ golden: .PHONY: test_unit_codecov test_unit_codecov: go test ./... -race -coverprofile=coverage.txt -covermode=atomic; curl -s https://codecov.io/bash > codecov_bash.sh; bash codecov_bash.sh + +.PHONY: generate +generate: download_tooling + @go generate ./... diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 1006ef965e..be5aa72c35 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -122,7 +122,7 @@ type WorkqueueConfig struct { // configuration for a node type NodeConfig struct { DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"` - MaxNodeRetriesForSystemFailures uint32 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"` + MaxNodeRetriesForSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"` } // Contains default values for timeouts diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index cd891c9eb5..ca64ef171d 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -77,8 +77,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "max-output-size-bytes"), defaultConfig.MaxDatasetSizeBytes, "Maximum size of outputs per task") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "kube-client-config.burst"), defaultConfig.KubeConfig.Burst, "Max burst rate for throttle. 0 defaults to 10") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "kube-client-config.timeout"), defaultConfig.KubeConfig.Timeout.String(), "Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout.") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-deadlines.node-execution-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.String(), "Default value of node execution timeout") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-deadlines.node-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String(), "Default value of node timeout") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-deadlines.workflow-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String(), "Default value of workflow timeout") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.node-execution-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.String(), "Default value of node execution timeout") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.node-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String(), "Default value of node timeout") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.workflow-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String(), "Default value of workflow timeout") + cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.max-node-retries-system-failures"), defaultConfig.NodeConfig.MaxNodeRetriesForSystemFailures, "Maximum number of retries per node for node failure due to infra issues") return cmdFlags } diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 9e79c55f71..7c1081b367 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -891,10 +891,10 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_default-deadlines.node-execution-deadline", func(t *testing.T) { + t.Run("Test_node-config.default-deadlines.node-execution-deadline", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vString, err := cmdFlags.GetString("default-deadlines.node-execution-deadline"); err == nil { + if vString, err := cmdFlags.GetString("node-config.default-deadlines.node-execution-deadline"); err == nil { assert.Equal(t, string(defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.String()), vString) } else { assert.FailNow(t, err.Error()) @@ -904,8 +904,8 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.String() - cmdFlags.Set("default-deadlines.node-execution-deadline", testValue) - if vString, err := cmdFlags.GetString("default-deadlines.node-execution-deadline"); err == nil { + cmdFlags.Set("node-config.default-deadlines.node-execution-deadline", testValue) + if vString, err := cmdFlags.GetString("node-config.default-deadlines.node-execution-deadline"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.NodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline) } else { @@ -913,10 +913,10 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_default-deadlines.node-active-deadline", func(t *testing.T) { + t.Run("Test_node-config.default-deadlines.node-active-deadline", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vString, err := cmdFlags.GetString("default-deadlines.node-active-deadline"); err == nil { + if vString, err := cmdFlags.GetString("node-config.default-deadlines.node-active-deadline"); err == nil { assert.Equal(t, string(defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String()), vString) } else { assert.FailNow(t, err.Error()) @@ -926,8 +926,8 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String() - cmdFlags.Set("default-deadlines.node-active-deadline", testValue) - if vString, err := cmdFlags.GetString("default-deadlines.node-active-deadline"); err == nil { + cmdFlags.Set("node-config.default-deadlines.node-active-deadline", testValue) + if vString, err := cmdFlags.GetString("node-config.default-deadlines.node-active-deadline"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline) } else { @@ -935,10 +935,10 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_default-deadlines.workflow-active-deadline", func(t *testing.T) { + t.Run("Test_node-config.default-deadlines.workflow-active-deadline", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vString, err := cmdFlags.GetString("default-deadlines.workflow-active-deadline"); err == nil { + if vString, err := cmdFlags.GetString("node-config.default-deadlines.workflow-active-deadline"); err == nil { assert.Equal(t, string(defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String()), vString) } else { assert.FailNow(t, err.Error()) @@ -948,8 +948,8 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String() - cmdFlags.Set("default-deadlines.workflow-active-deadline", testValue) - if vString, err := cmdFlags.GetString("default-deadlines.workflow-active-deadline"); err == nil { + cmdFlags.Set("node-config.default-deadlines.workflow-active-deadline", testValue) + if vString, err := cmdFlags.GetString("node-config.default-deadlines.workflow-active-deadline"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline) } else { @@ -957,4 +957,26 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_node-config.max-node-retries-system-failures", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vInt64, err := cmdFlags.GetInt64("node-config.max-node-retries-system-failures"); err == nil { + assert.Equal(t, int64(defaultConfig.NodeConfig.MaxNodeRetriesForSystemFailures), vInt64) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.max-node-retries-system-failures", testValue) + if vInt64, err := cmdFlags.GetInt64("node-config.max-node-retries-system-failures"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt64), &actual.NodeConfig.MaxNodeRetriesForSystemFailures) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index cc933d3ad1..fca1f1a56c 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -734,7 +734,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora outputResolver: NewRemoteFileOutputResolver(store), defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, - maxNodeRetriesForSystemFailures: nodeConfig.MaxNodeRetriesForSystemFailures, + maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesForSystemFailures), } nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory