diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index de28612c54..db2ec55261 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -337,7 +337,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter var launchPlanActor launchplan.FlyteAdmin if cfg.EnableAdminLauncher { - launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration, + launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store) if err != nil { logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 29de745acf..95d23045a6 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -3,7 +3,6 @@ package launchplan import ( "context" "fmt" - "time" "github.com/golang/protobuf/ptypes/wrappers" "golang.org/x/time/rate" @@ -311,14 +310,14 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc } func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient, - syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) { + cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) { exec := &adminLaunchPlanExecutor{ adminClient: client, store: store, } rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)} - c, err := cache.NewAutoRefreshCache("admin-launcher", exec.syncItem, rateLimiter, syncPeriod, cfg.Workers, cfg.MaxCacheSize, scope) + c, err := cache.NewAutoRefreshCache("admin-launcher", exec.syncItem, rateLimiter, cfg.CacheResyncDuration.Duration, cfg.Workers, cfg.MaxCacheSize, scope) if err != nil { return nil, err } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index 89bb0e2477..ac988ab40f 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -17,6 +17,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/cache" mocks2 "github.com/flyteorg/flyte/flytestdlib/cache/mocks" + "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" @@ -26,6 +27,9 @@ import ( func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { ctx := context.TODO() + adminConfig := defaultAdminConfig + adminConfig.CacheResyncDuration = config.Duration{Duration: time.Millisecond} + id := &core.WorkflowExecutionIdentifier{ Name: "n", Domain: "d", @@ -38,7 +42,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.On("GetExecution", ctx, @@ -65,7 +69,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }), ).Return(nil, status.Error(codes.NotFound, "")) - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) assert.NoError(t, exec.Initialize(ctx)) @@ -111,7 +115,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }), ).Return(nil, status.Error(codes.Canceled, "")) - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) assert.NoError(t, exec.Initialize(ctx)) @@ -146,6 +150,8 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { ctx := context.TODO() + adminConfig := defaultAdminConfig + adminConfig.CacheResyncDuration = config.Duration{Duration: time.Second} id := &core.WorkflowExecutionIdentifier{ Name: "n", Domain: "d", @@ -157,7 +163,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { @@ -195,7 +201,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "orig", }, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("RecoverExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionRecoverRequest) bool { @@ -231,7 +237,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "orig", }, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) recoveryErr := status.Error(codes.NotFound, "foo") @@ -273,7 +279,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }), @@ -301,7 +307,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("other", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }), @@ -329,6 +335,8 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { ctx := context.TODO() + adminConfig := defaultAdminConfig + adminConfig.CacheResyncDuration = config.Duration{Duration: time.Second} id := &core.WorkflowExecutionIdentifier{ Name: "n", Domain: "d", @@ -341,7 +349,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -354,7 +362,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -367,7 +375,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("other", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -381,6 +389,8 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { ctx := context.TODO() + adminConfig := defaultAdminConfig + adminConfig.CacheResyncDuration = config.Duration{Duration: time.Second} id := &core.Identifier{ ResourceType: core.ResourceType_LAUNCH_PLAN, Name: "n", @@ -393,7 +403,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { t.Run("launch plan found", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.OnGetLaunchPlanMatch( ctx, @@ -406,7 +416,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { t.Run("launch plan not found", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.OnGetLaunchPlanMatch( ctx, @@ -435,6 +445,9 @@ type test struct { func TestAdminLaunchPlanExecutorScenarios(t *testing.T) { ctx := context.TODO() + adminConfig := defaultAdminConfig + adminConfig.CacheResyncDuration = config.Duration{Duration: time.Millisecond} + mockExecutionRespWithOutputs := &admin.Execution{ Closure: &admin.ExecutionClosure{ Phase: core.WorkflowExecution_SUCCEEDED, @@ -546,7 +559,7 @@ func TestAdminLaunchPlanExecutorScenarios(t *testing.T) { ComposedProtobufStore: pbStore, ReferenceConstructor: &storageMocks.ReferenceConstructor{}, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), storageClient) assert.NoError(t, err) iwMock := &mocks2.ItemWrapper{} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go index ffb14e3182..0c83f803af 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go @@ -1,7 +1,10 @@ package launchplan import ( + "time" + ctrlConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyte/flytestdlib/config" ) //go:generate pflags AdminConfig --default-var defaultAdminConfig @@ -12,6 +15,9 @@ var ( Burst: 10, MaxCacheSize: 10000, Workers: 10, + CacheResyncDuration: config.Duration{ + Duration: 30 * time.Second, + }, } adminConfigSection = ctrlConfig.MustRegisterSubSection("admin-launcher", defaultAdminConfig) @@ -31,6 +37,9 @@ type AdminConfig struct { MaxCacheSize int `json:"cacheSize" pflag:",Maximum cache in terms of number of items stored."` Workers int `json:"workers" pflag:",Number of parallel workers to work on the queue."` + + // CacheResyncDuration defines the interval that the admin launcher should refresh the launchplan cache. + CacheResyncDuration config.Duration `json:"cache-resync-duration" pflag:",Frequency of re-syncing launchplans within the auto refresh cache."` } func GetAdminConfig() *AdminConfig { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags.go index 3bb535e179..a0f36edb11 100755 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags.go @@ -54,5 +54,6 @@ func (cfg AdminConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "burst"), defaultAdminConfig.Burst, "Maximum burst for throttle") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "cacheSize"), defaultAdminConfig.MaxCacheSize, "Maximum cache in terms of number of items stored.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "workers"), defaultAdminConfig.Workers, "Number of parallel workers to work on the queue.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "cache-resync-duration"), defaultAdminConfig.CacheResyncDuration.String(), "Frequency of re-syncing launchplans within the auto refresh cache.") return cmdFlags } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags_test.go index bbff474eb1..7e4f8f4a67 100755 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/adminconfig_flags_test.go @@ -155,4 +155,18 @@ func TestAdminConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_cache-resync-duration", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := defaultAdminConfig.CacheResyncDuration.String() + + cmdFlags.Set("cache-resync-duration", testValue) + if vString, err := cmdFlags.GetString("cache-resync-duration"); err == nil { + testDecodeJson_AdminConfig(t, fmt.Sprintf("%v", vString), &actual.CacheResyncDuration) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) }