Skip to content

Commit

Permalink
Support offloading workflow CRD inputs (#5609)
Browse files Browse the repository at this point in the history
* Support offloading workflow CRD inputs

Signed-off-by: Katrina Rogan <[email protected]>

* Add tests

Signed-off-by: Katrina Rogan <[email protected]>

* not just single tasks

Signed-off-by: Katrina Rogan <[email protected]>

* lint

Signed-off-by: Katrina Rogan <[email protected]>

---------

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Aug 1, 2024
1 parent 01ecd0a commit 92dca29
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 1 deletion.
2 changes: 2 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
OffloadedInputsReference: inputsURI,
})

if err != nil {
Expand Down Expand Up @@ -1032,6 +1033,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
OffloadedInputsReference: inputsURI,
})
if execErr != nil {
createExecModelInput.Error = execErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type ApplicationConfig struct {

// A URL pointing to the flyteconsole instance used to hit this flyteadmin instance.
ConsoleURL string `json:"consoleUrl,omitempty" pflag:",A URL pointing to the flyteconsole instance used to hit this flyteadmin instance."`

// Enabling this will instruct operator to use storage (s3/gcs/etc) to offload workflow execution inputs instead of storing them inline in the CRD.
UseOffloadedInputs bool `json:"useOffloadedInputs" pflag:",Use offloaded inputs for workflows."`
}

func (a *ApplicationConfig) GetRoleNameKey() string {
Expand Down
4 changes: 4 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
flyteWf.SubWorkflows = nil
flyteWf.Tasks = nil
}
if e.config.ApplicationConfiguration().GetTopLevelConfig().UseOffloadedInputs {
flyteWf.OffloadedInputs = data.OffloadedInputsReference
flyteWf.Inputs = nil
}

if consoleURL := e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL; len(consoleURL) > 0 {
flyteWf.ConsoleURL = consoleURL
Expand Down
71 changes: 71 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
flyteclient "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned"
v1alpha12 "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}
Expand Down Expand Up @@ -417,3 +418,73 @@ func TestExecute_OffloadWorkflowClosure(t *testing.T) {
assert.Nil(t, offloadedFlyteWf.Tasks)
assert.Nil(t, offloadedFlyteWf.SubWorkflows)
}

func TestExecute_OffloadInputs(t *testing.T) {
offloadedFlyteWf := &v1alpha1.FlyteWorkflow{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: execID,
},
Inputs: &v1alpha1.Inputs{
LiteralMap: testInputs,
},
}
inputsReference := storage.DataReference("inputs")

mockApplicationConfig := runtimeMocks.MockApplicationProvider{}
mockApplicationConfig.SetTopLevelConfig(runtimeInterfaces.ApplicationConfig{
UseOffloadedInputs: true,
})
mockRuntime := runtimeMocks.NewMockConfigurationProvider(&mockApplicationConfig, nil, nil, nil, nil, nil)

mockBuilder := mocks.FlyteWorkflowBuilder{}
workflowClosure := core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Id: &core.Identifier{
Project: "p",
Domain: "d",
Name: "n",
Version: "version",
},
},
},
}
mockBuilder.OnBuildMatch(mock.MatchedBy(func(wfClosure *core.CompiledWorkflowClosure) bool {
return proto.Equal(wfClosure, &workflowClosure)
}), mock.MatchedBy(func(inputs *core.LiteralMap) bool {
return proto.Equal(inputs, testInputs)
}), mock.MatchedBy(func(executionID *core.WorkflowExecutionIdentifier) bool {
return proto.Equal(executionID, execID)
}), namespace).Return(offloadedFlyteWf, nil)
executor := K8sWorkflowExecutor{
config: mockRuntime,
workflowBuilder: &mockBuilder,
executionCluster: getFakeExecutionCluster(),
}
assert.NotNil(t, offloadedFlyteWf.Inputs)

resp, err := executor.Execute(context.TODO(), interfaces.ExecutionData{
Namespace: namespace,
ExecutionID: execID,
ReferenceWorkflowName: "ref_workflow_name",
ReferenceLaunchPlanName: "ref_lp_name",
WorkflowClosure: &workflowClosure,
ExecutionParameters: interfaces.ExecutionParameters{
Inputs: testInputs,
ExecutionConfig: &admin.WorkflowExecutionConfig{
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
IamRole: testRoleSc,
K8SServiceAccount: testK8sServiceAccountSc,
},
},
},
},
OffloadedInputsReference: inputsReference,
})
assert.NoError(t, err)
assert.Equal(t, resp.Cluster, clusterID)

assert.Nil(t, offloadedFlyteWf.Inputs)
assert.Equal(t, inputsReference, offloadedFlyteWf.OffloadedInputs)
}
2 changes: 2 additions & 0 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ExecutionData struct {
WorkflowClosureReference storage.DataReference
// Additional parameters used to build a workflow execution
ExecutionParameters ExecutionParameters
// Storage data reference of the execution inputs
OffloadedInputsReference storage.DataReference
}

// ExecutionResponse is returned when a Flyte workflow execution is successfully created.
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type FlyteWorkflow struct {

// Flyteconsole url
ConsoleURL string `json:"consoleUrl,omitempty"`

// Much like WorkflowClosureReference, this field represents the location of offloaded inputs. If this exists,
// then the literal Inputs must not be populated. Flytepropeller must retrieve and parse the static inputs prior to
// processing.
OffloadedInputs DataReference `json:"offloadedInputs,omitempty"`
}

func (in *FlyteWorkflow) GetSecurityContext() core.SecurityContext {
Expand Down
16 changes: 15 additions & 1 deletion flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,23 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.
Message: err.Error()}), nil
}
w.GetExecutionStatus().SetDataDir(ref)
var inputs *core.LiteralMap
inputs := &core.LiteralMap{}
if w.Inputs != nil {
if len(w.OffloadedInputs) > 0 {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: errors.BadSpecificationError.String(),
Message: "cannot specify inline inputs AND offloaded inputs"}), nil
}
inputs = w.Inputs.LiteralMap
} else if len(w.OffloadedInputs) > 0 {
err = c.store.ReadProtobuf(ctx, w.OffloadedInputs, inputs)
if err != nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "OffloadedInputsReadFailure",
Message: err.Error()}), nil
}
}
// Before starting the subworkflow, lets set the inputs for the Workflow. The inputs for a SubWorkflow are essentially
// Copy of the inputs to the Node
Expand Down

0 comments on commit 92dca29

Please sign in to comment.