From b3da447bb22fc29c8eabded805303302ebe95491 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 13 Jul 2024 14:28:09 -0700 Subject: [PATCH 1/5] Add noop plugin Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/noop/noop.go | 59 ++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 flyteplugins/go/tasks/plugins/noop/noop.go diff --git a/flyteplugins/go/tasks/plugins/noop/noop.go b/flyteplugins/go/tasks/plugins/noop/noop.go new file mode 100644 index 0000000000..184df429c8 --- /dev/null +++ b/flyteplugins/go/tasks/plugins/noop/noop.go @@ -0,0 +1,59 @@ +package noop + +import ( + "context" + pluginErrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils" +) + +const ( + noopTaskType = "noop" +) + +type NoopPlugin struct{} + +func (n NoopPlugin) GetID() string { + return noopTaskType +} + +func (n NoopPlugin) GetProperties() core.PluginProperties { + return core.PluginProperties{} +} + +func (n NoopPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { + inputs, err := tCtx.InputReader().Get(ctx) + if err != nil { + return core.DoTransition(core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "Failed to read the inputs.", nil)), nil + } + + err = tCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader( + inputs, nil, nil)) + if err != nil { + return core.DoTransition(core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "Failed to write outputs", nil)), nil + } + + return core.DoTransition(core.PhaseInfoSuccess(nil)), nil +} + +func (n NoopPlugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error { + return nil +} + +func (n NoopPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error { + return nil +} + +func init() { + pluginmachinery.PluginRegistry().RegisterCorePlugin( + core.PluginEntry{ + ID: noopTaskType, + RegisteredTaskTypes: []core.TaskType{noopTaskType}, + LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) { + return NoopPlugin{}, nil + }, + IsDefault: true, + }, + ) +} From 9c649d5430fd97454ff618cbdb48b12b200349d3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 16 Jul 2024 14:28:13 -0700 Subject: [PATCH 2/5] Update echo plugin Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/noop/noop.go | 59 --------------- flyteplugins/go/tasks/plugins/testing/echo.go | 72 ++++++++++--------- .../pkg/controller/nodes/branch/handler.go | 1 + 3 files changed, 40 insertions(+), 92 deletions(-) delete mode 100644 flyteplugins/go/tasks/plugins/noop/noop.go diff --git a/flyteplugins/go/tasks/plugins/noop/noop.go b/flyteplugins/go/tasks/plugins/noop/noop.go deleted file mode 100644 index 184df429c8..0000000000 --- a/flyteplugins/go/tasks/plugins/noop/noop.go +++ /dev/null @@ -1,59 +0,0 @@ -package noop - -import ( - "context" - pluginErrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils" -) - -const ( - noopTaskType = "noop" -) - -type NoopPlugin struct{} - -func (n NoopPlugin) GetID() string { - return noopTaskType -} - -func (n NoopPlugin) GetProperties() core.PluginProperties { - return core.PluginProperties{} -} - -func (n NoopPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { - inputs, err := tCtx.InputReader().Get(ctx) - if err != nil { - return core.DoTransition(core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "Failed to read the inputs.", nil)), nil - } - - err = tCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader( - inputs, nil, nil)) - if err != nil { - return core.DoTransition(core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "Failed to write outputs", nil)), nil - } - - return core.DoTransition(core.PhaseInfoSuccess(nil)), nil -} - -func (n NoopPlugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error { - return nil -} - -func (n NoopPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error { - return nil -} - -func init() { - pluginmachinery.PluginRegistry().RegisterCorePlugin( - core.PluginEntry{ - ID: noopTaskType, - RegisteredTaskTypes: []core.TaskType{noopTaskType}, - LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) { - return NoopPlugin{}, nil - }, - IsDefault: true, - }, - ) -} diff --git a/flyteplugins/go/tasks/plugins/testing/echo.go b/flyteplugins/go/tasks/plugins/testing/echo.go index 885ab5dfc4..ba30eea8c4 100644 --- a/flyteplugins/go/tasks/plugins/testing/echo.go +++ b/flyteplugins/go/tasks/plugins/testing/echo.go @@ -33,6 +33,10 @@ func (e *EchoPlugin) GetProperties() core.PluginProperties { func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { echoConfig := ConfigSection.GetConfig().(*Config) + if echoConfig.SleepDuration.Duration == time.Duration(0) { + return copyInputsToOutputs(ctx, tCtx) + } + var startTime time.Time var exists bool taskExecutionID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() @@ -50,39 +54,7 @@ func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) } if time.Since(startTime) >= echoConfig.SleepDuration.Duration { - // copy inputs to outputs - inputToOutputVariableMappings, err := compileInputToOutputVariableMappings(ctx, tCtx) - if err != nil { - return core.UnknownTransition, err - } - - if len(inputToOutputVariableMappings) > 0 { - inputLiterals, err := tCtx.InputReader().Get(ctx) - if err != nil { - return core.UnknownTransition, err - } - - outputLiterals := make(map[string]*idlcore.Literal, len(inputToOutputVariableMappings)) - for inputVariableName, outputVariableName := range inputToOutputVariableMappings { - outputLiterals[outputVariableName] = inputLiterals.Literals[inputVariableName] - } - - outputLiteralMap := &idlcore.LiteralMap{ - Literals: outputLiterals, - } - - outputFile := tCtx.OutputWriter().GetOutputPath() - if err := tCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, outputLiteralMap); err != nil { - return core.UnknownTransition, err - } - - or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0) - if err = tCtx.OutputWriter().Put(ctx, or); err != nil { - return core.UnknownTransition, err - } - } - - return core.DoTransition(core.PhaseInfoSuccess(nil)), nil + return copyInputsToOutputs(ctx, tCtx) } return core.DoTransition(core.PhaseInfoRunning(core.DefaultPhaseVersion, nil)), nil @@ -98,6 +70,40 @@ func (e *EchoPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContex return nil } +func copyInputsToOutputs(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { + inputToOutputVariableMappings, err := compileInputToOutputVariableMappings(ctx, tCtx) + if err != nil { + return core.UnknownTransition, err + } + + if len(inputToOutputVariableMappings) > 0 { + inputLiterals, err := tCtx.InputReader().Get(ctx) + if err != nil { + return core.UnknownTransition, err + } + + outputLiterals := make(map[string]*idlcore.Literal, len(inputToOutputVariableMappings)) + for inputVariableName, outputVariableName := range inputToOutputVariableMappings { + outputLiterals[outputVariableName] = inputLiterals.Literals[inputVariableName] + } + + outputLiteralMap := &idlcore.LiteralMap{ + Literals: outputLiterals, + } + + outputFile := tCtx.OutputWriter().GetOutputPath() + if err := tCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, outputLiteralMap); err != nil { + return core.UnknownTransition, err + } + + or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0) + if err = tCtx.OutputWriter().Put(ctx, or); err != nil { + return core.UnknownTransition, err + } + } + return core.DoTransition(core.PhaseInfoSuccess(nil)), nil +} + func compileInputToOutputVariableMappings(ctx context.Context, tCtx core.TaskExecutionContext) (map[string]string, error) { // validate outputs are castable from inputs otherwise error as this plugin is not applicable taskTemplate, err := tCtx.TaskReader().Read(ctx) diff --git a/flytepropeller/pkg/controller/nodes/branch/handler.go b/flytepropeller/pkg/controller/nodes/branch/handler.go index 431f5fa3eb..d8f59ca780 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler.go @@ -152,6 +152,7 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx interfaces.N if downstreamStatus.IsComplete() { // For branch node we set the output node to be the same as the child nodes output + nCtx.NodeStateWriter() phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{ OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())}, }) From 21cb53098daa3bc6cfbe1487f984272fb2feb69a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 16 Jul 2024 14:46:00 -0700 Subject: [PATCH 3/5] thread safe Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/testing/echo.go | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/testing/echo.go b/flyteplugins/go/tasks/plugins/testing/echo.go index ba30eea8c4..7f4d31c533 100644 --- a/flyteplugins/go/tasks/plugins/testing/echo.go +++ b/flyteplugins/go/tasks/plugins/testing/echo.go @@ -3,6 +3,7 @@ package testing import ( "context" "fmt" + "sync" "time" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -20,23 +21,12 @@ const ( type EchoPlugin struct { enqueueOwner core.EnqueueOwner taskStartTimes map[string]time.Time + sync.Mutex } -func (e *EchoPlugin) GetID() string { - return echoTaskType -} - -func (e *EchoPlugin) GetProperties() core.PluginProperties { - return core.PluginProperties{} -} - -func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { - echoConfig := ConfigSection.GetConfig().(*Config) - - if echoConfig.SleepDuration.Duration == time.Duration(0) { - return copyInputsToOutputs(ctx, tCtx) - } - +func (e *EchoPlugin) addTask(ctx context.Context, tCtx core.TaskExecutionContext) time.Time { + e.Lock() + defer e.Unlock() var startTime time.Time var exists bool taskExecutionID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() @@ -46,12 +36,38 @@ func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) // start timer to enqueue owner once task sleep duration has elapsed go func() { + echoConfig := ConfigSection.GetConfig().(*Config) time.Sleep(echoConfig.SleepDuration.Duration) if err := e.enqueueOwner(tCtx.TaskExecutionMetadata().GetOwnerID()); err != nil { logger.Warnf(ctx, "failed to enqueue owner [%s]: %v", tCtx.TaskExecutionMetadata().GetOwnerID(), err) } }() } + return startTime +} + +func (e *EchoPlugin) removeTask(taskExecutionID string) { + e.Lock() + defer e.Unlock() + delete(e.taskStartTimes, taskExecutionID) +} + +func (e *EchoPlugin) GetID() string { + return echoTaskType +} + +func (e *EchoPlugin) GetProperties() core.PluginProperties { + return core.PluginProperties{} +} + +func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { + echoConfig := ConfigSection.GetConfig().(*Config) + + if echoConfig.SleepDuration.Duration == time.Duration(0) { + return copyInputsToOutputs(ctx, tCtx) + } + + startTime := e.addTask(ctx, tCtx) if time.Since(startTime) >= echoConfig.SleepDuration.Duration { return copyInputsToOutputs(ctx, tCtx) @@ -66,7 +82,7 @@ func (e *EchoPlugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) func (e *EchoPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error { taskExecutionID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - delete(e.taskStartTimes, taskExecutionID) + e.removeTask(taskExecutionID) return nil } From 27a350c2ab84b5bfdd62a138eb45c8e537cb833b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 16 Jul 2024 14:48:52 -0700 Subject: [PATCH 4/5] nit Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/testing/echo.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/testing/echo.go b/flyteplugins/go/tasks/plugins/testing/echo.go index 7f4d31c533..6a59772aef 100644 --- a/flyteplugins/go/tasks/plugins/testing/echo.go +++ b/flyteplugins/go/tasks/plugins/testing/echo.go @@ -24,6 +24,14 @@ type EchoPlugin struct { sync.Mutex } +func (e *EchoPlugin) GetID() string { + return echoTaskType +} + +func (e *EchoPlugin) GetProperties() core.PluginProperties { + return core.PluginProperties{} +} + func (e *EchoPlugin) addTask(ctx context.Context, tCtx core.TaskExecutionContext) time.Time { e.Lock() defer e.Unlock() @@ -52,14 +60,6 @@ func (e *EchoPlugin) removeTask(taskExecutionID string) { delete(e.taskStartTimes, taskExecutionID) } -func (e *EchoPlugin) GetID() string { - return echoTaskType -} - -func (e *EchoPlugin) GetProperties() core.PluginProperties { - return core.PluginProperties{} -} - func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { echoConfig := ConfigSection.GetConfig().(*Config) From 0ae89ac4304f067e4b367caa12bdb1bd9a8c7bab Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 16 Jul 2024 14:52:35 -0700 Subject: [PATCH 5/5] nit Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/testing/echo.go | 4 ++++ flytepropeller/pkg/controller/nodes/branch/handler.go | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/testing/echo.go b/flyteplugins/go/tasks/plugins/testing/echo.go index 6a59772aef..7c55d3862f 100644 --- a/flyteplugins/go/tasks/plugins/testing/echo.go +++ b/flyteplugins/go/tasks/plugins/testing/echo.go @@ -32,6 +32,8 @@ func (e *EchoPlugin) GetProperties() core.PluginProperties { return core.PluginProperties{} } +// Enqueue the task to be re-evaluated after SleepDuration. +// If the task is already enqueued, return the start time of the task. func (e *EchoPlugin) addTask(ctx context.Context, tCtx core.TaskExecutionContext) time.Time { e.Lock() defer e.Unlock() @@ -54,6 +56,7 @@ func (e *EchoPlugin) addTask(ctx context.Context, tCtx core.TaskExecutionContext return startTime } +// Remove the task from the taskStartTimes map. func (e *EchoPlugin) removeTask(taskExecutionID string) { e.Lock() defer e.Unlock() @@ -86,6 +89,7 @@ func (e *EchoPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContex return nil } +// copyInputsToOutputs copies the input literals to the output location. func copyInputsToOutputs(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { inputToOutputVariableMappings, err := compileInputToOutputVariableMappings(ctx, tCtx) if err != nil { diff --git a/flytepropeller/pkg/controller/nodes/branch/handler.go b/flytepropeller/pkg/controller/nodes/branch/handler.go index d8f59ca780..431f5fa3eb 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler.go @@ -152,7 +152,6 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx interfaces.N if downstreamStatus.IsComplete() { // For branch node we set the output node to be the same as the child nodes output - nCtx.NodeStateWriter() phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{ OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())}, })