diff --git a/pkg/common/data_store.go b/pkg/common/data_store.go new file mode 100644 index 0000000000..cadcc8f942 --- /dev/null +++ b/pkg/common/data_store.go @@ -0,0 +1,30 @@ +package common + +import ( + "context" + + "github.com/flyteorg/flyteadmin/pkg/errors" + "github.com/flyteorg/flyteadmin/pkg/manager/impl/shared" + "google.golang.org/grpc/codes" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/storage" +) + +func OffloadLiteralMap(ctx context.Context, storageClient *storage.DataStore, literalMap *core.LiteralMap, nestedKeys ...string) (storage.DataReference, error) { + if literalMap == nil { + literalMap = &core.LiteralMap{} + } + nestedKeyReference := []string{ + shared.Metadata, + } + nestedKeyReference = append(nestedKeyReference, nestedKeys...) + uri, err := storageClient.ConstructReference(ctx, storageClient.GetBaseContainerFQN(ctx), nestedKeyReference...) + if err != nil { + return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to construct data reference for [%+v] with err: %v", nestedKeys, err) + } + if err := storageClient.WriteProtobuf(ctx, uri, storage.Options{}, literalMap); err != nil { + return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to write protobuf for [%+v] with err: %v", nestedKeys, err) + } + return uri, nil +} diff --git a/pkg/common/data_store_test.go b/pkg/common/data_store_test.go new file mode 100644 index 0000000000..e0907c77a6 --- /dev/null +++ b/pkg/common/data_store_test.go @@ -0,0 +1,65 @@ +package common + +import ( + "context" + "testing" + + "github.com/flyteorg/flyteadmin/pkg/errors" + "google.golang.org/grpc/codes" + + commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +var literalMap = &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 4, + }, + }, + }, + }, + }, + }, + }, +} + +func TestOffloadLiteralMap(t *testing.T) { + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error { + assert.Equal(t, reference.String(), "s3://bucket/metadata/nested/key") + return nil + } + + uri, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key") + assert.NoError(t, err) + assert.Equal(t, "s3://bucket/metadata/nested/key", uri.String()) +} + +func TestOffloadLiteralMap_ConstructReferenceError(t *testing.T) { + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ConstructReferenceCb = func( + ctx context.Context, reference storage.DataReference, nestedKeys ...string) (storage.DataReference, error) { + return "foo", errors.NewFlyteAdminError(codes.Internal, "foo") + } + _, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key") + assert.Equal(t, err.(errors.FlyteAdminError).Code(), codes.Internal) +} + +func TestOffloadLiteralMap_StorageFailure(t *testing.T) { + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error { + assert.Equal(t, reference.String(), "s3://bucket/metadata/nested/key") + return errors.NewFlyteAdminError(codes.Internal, "foo") + } + _, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key") + assert.Equal(t, err.(errors.FlyteAdminError).Code(), codes.Internal) +} diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 1737f4d9f9..48ecae93ea 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -197,20 +197,6 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID * return nil, nil } -func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.LiteralMap, identifier *core.WorkflowExecutionIdentifier, key string) (storage.DataReference, error) { - if literalMap == nil { - literalMap = &core.LiteralMap{} - } - inputsURI, err := m.storageClient.ConstructReference(ctx, m.storageClient.GetBaseContainerFQN(ctx), shared.Metadata, identifier.Project, identifier.Domain, identifier.Name, key) - if err != nil { - return "", err - } - if err := m.storageClient.WriteProtobuf(ctx, inputsURI, storage.Options{}, literalMap); err != nil { - return "", err - } - return inputsURI, nil -} - type completeTaskResources struct { Defaults runtimeInterfaces.TaskResourceSet Limits runtimeInterfaces.TaskResourceSet @@ -569,11 +555,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( // Dynamically assign execution queues. m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow) - inputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.Inputs) + inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { return nil, nil, err } - userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs) + userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { return nil, nil, err } @@ -764,11 +750,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( // Dynamically assign execution queues. m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow) - inputsURI, err := m.offloadInputs(ctx, executionInputs, &workflowExecutionID, shared.Inputs) + inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { return nil, nil, err } - userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs) + userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { return nil, nil, err } @@ -1178,7 +1164,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi return nil, errors.NewAlreadyInTerminalStateError(ctx, errorMsg, curPhase) } - err = transformers.UpdateExecutionModelState(executionModel, request) + err = transformers.UpdateExecutionModelState(ctx, executionModel, request, m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, m.storageClient) if err != nil { logger.Debugf(ctx, "failed to transform updated workflow execution model [%+v] after receiving event with err: %v", request.Event.ExecutionId, err) @@ -1266,7 +1252,7 @@ func (m *ExecutionManager) GetExecutionData( if err := proto.Unmarshal(executionModel.Closure, closure); err != nil { return nil, err } - newInputsURI, err := m.offloadInputs(ctx, closure.ComputedInputs, request.Id, shared.Inputs) + newInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, closure.ComputedInputs, request.Id.Project, request.Id.Domain, request.Id.Name, shared.Inputs) if err != nil { return nil, err } diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index 0a050fa96a..4284b8f9a5 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -123,11 +123,13 @@ func (m *NodeExecutionManager) createNodeExecutionWithEvent( } parentID = &parentNodeExecutionModel.ID } - nodeExecutionModel, err := transformers.CreateNodeExecutionModel(transformers.ToNodeExecutionModelInput{ + nodeExecutionModel, err := transformers.CreateNodeExecutionModel(ctx, transformers.ToNodeExecutionModelInput{ Request: request, ParentTaskExecutionID: parentTaskExecutionID, ParentID: parentID, DynamicWorkflowRemoteClosure: dynamicWorkflowRemoteClosureReference, + InlineEventDataPolicy: m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, + StorageClient: m.storageClient, }) if err != nil { logger.Debugf(ctx, "failed to create node execution model for event request: %s with err: %v", @@ -175,7 +177,9 @@ func (m *NodeExecutionManager) updateNodeExecutionWithEvent( return updateFailed, err } } - err := transformers.UpdateNodeExecutionModel(request, nodeExecutionModel, childExecutionID, dynamicWorkflowRemoteClosureReference) + err := transformers.UpdateNodeExecutionModel(ctx, request, nodeExecutionModel, childExecutionID, + dynamicWorkflowRemoteClosureReference, m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, + m.storageClient) if err != nil { logger.Debugf(ctx, "failed to update node execution model: %+v with err: %v", request.Event.Id, err) return updateFailed, err diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index e552df3cef..90c6670bd8 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -81,8 +81,11 @@ func (m *TaskExecutionManager) createTaskExecution( } taskExecutionModel, err := transformers.CreateTaskExecutionModel( + ctx, transformers.CreateTaskExecutionModelInput{ - Request: request, + Request: request, + InlineEventDataPolicy: m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, + StorageClient: m.storageClient, }) if err != nil { logger.Debugf(ctx, "failed to transform task execution %+v into database model: %v", request.Event.TaskId, err) @@ -104,7 +107,8 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState( ctx context.Context, request *admin.TaskExecutionEventRequest, existingTaskExecution *models.TaskExecution) ( models.TaskExecution, error) { - err := transformers.UpdateTaskExecutionModel(request, existingTaskExecution) + err := transformers.UpdateTaskExecutionModel(ctx, request, existingTaskExecution, + m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, m.storageClient) if err != nil { logger.Debugf(ctx, "failed to update task execution model [%+v] with err: %v", request.Event.TaskId, err) return models.TaskExecution{}, err diff --git a/pkg/repositories/transformers/constants.go b/pkg/repositories/transformers/constants.go new file mode 100644 index 0000000000..8b7c807509 --- /dev/null +++ b/pkg/repositories/transformers/constants.go @@ -0,0 +1,5 @@ +package transformers + +// OutputsObjectSuffix is used when execution event data includes inline outputs but the admin deployment is configured +// to offload such data. The generated file path for the offloaded data will include the execution identifier and this suffix. +const OutputsObjectSuffix = "offloaded_outputs" diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index 360dffb54a..91421b1756 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" @@ -104,7 +106,9 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e // Updates an existing model given a WorkflowExecution event. func UpdateExecutionModelState( - execution *models.Execution, request admin.WorkflowExecutionEventRequest) error { + ctx context.Context, + execution *models.Execution, request admin.WorkflowExecutionEventRequest, + inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error { var executionClosure admin.ExecutionClosure err := proto.Unmarshal(execution.Closure, &executionClosure) if err != nil { @@ -143,8 +147,25 @@ func UpdateExecutionModelState( }, } } else if request.Event.GetOutputData() != nil { - executionClosure.OutputResult = &admin.ExecutionClosure_OutputData{ - OutputData: request.Event.GetOutputData(), + switch inlineEventDataPolicy { + case interfaces.InlineEventDataPolicyStoreInline: + executionClosure.OutputResult = &admin.ExecutionClosure_OutputData{ + OutputData: request.Event.GetOutputData(), + } + default: + logger.Debugf(ctx, "Offloading outputs per InlineEventDataPolicy") + uri, err := common.OffloadLiteralMap(ctx, storageClient, request.Event.GetOutputData(), + request.Event.ExecutionId.Project, request.Event.ExecutionId.Domain, request.Event.ExecutionId.Name, OutputsObjectSuffix) + if err != nil { + return err + } + executionClosure.OutputResult = &admin.ExecutionClosure_Outputs{ + Outputs: &admin.LiteralMapBlob{ + Data: &admin.LiteralMapBlob_Uri{ + Uri: uri.String(), + }, + }, + } } } else if request.Event.GetError() != nil { executionClosure.OutputResult = &admin.ExecutionClosure_Error{ diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index 07262633be..af22bc2ffb 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -1,10 +1,15 @@ package transformers import ( + "context" "fmt" "testing" "time" + commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks" + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/golang/protobuf/ptypes" @@ -124,12 +129,12 @@ func TestUpdateModelState_UnknownToRunning(t *testing.T) { occurredAt := time.Date(2018, 10, 29, 16, 10, 0, 0, time.UTC) occurredAtProto, _ := ptypes.TimestampProto(occurredAt) - err := UpdateExecutionModelState(&executionModel, admin.WorkflowExecutionEventRequest{ + err := UpdateExecutionModelState(context.TODO(), &executionModel, admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_RUNNING, OccurredAt: occurredAtProto, }, - }) + }, interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) expectedClosure := admin.ExecutionClosure{ @@ -188,7 +193,7 @@ func TestUpdateModelState_RunningToFailed(t *testing.T) { Kind: ek, Message: "bar baz", } - err := UpdateExecutionModelState(&executionModel, admin.WorkflowExecutionEventRequest{ + err := UpdateExecutionModelState(context.TODO(), &executionModel, admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_ABORTED, OccurredAt: occurredAtProto, @@ -196,7 +201,7 @@ func TestUpdateModelState_RunningToFailed(t *testing.T) { Error: &executionError, }, }, - }) + }, interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) ekString := ek.String() @@ -275,7 +280,7 @@ func TestUpdateModelState_RunningToSuccess(t *testing.T) { } t.Run("output URI set", func(t *testing.T) { - err := UpdateExecutionModelState(&executionModel, admin.WorkflowExecutionEventRequest{ + err := UpdateExecutionModelState(context.TODO(), &executionModel, admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_SUCCEEDED, OccurredAt: occurredAtProto, @@ -283,7 +288,7 @@ func TestUpdateModelState_RunningToSuccess(t *testing.T) { OutputUri: "output.pb", }, }, - }) + }, interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) expectedClosure := admin.ExecutionClosure{ @@ -326,7 +331,7 @@ func TestUpdateModelState_RunningToSuccess(t *testing.T) { }, }, } - err := UpdateExecutionModelState(&executionModel, admin.WorkflowExecutionEventRequest{ + err := UpdateExecutionModelState(context.TODO(), &executionModel, admin.WorkflowExecutionEventRequest{ Event: &event.WorkflowExecutionEvent{ Phase: core.WorkflowExecution_SUCCEEDED, OccurredAt: occurredAtProto, @@ -334,7 +339,7 @@ func TestUpdateModelState_RunningToSuccess(t *testing.T) { OutputData: outputData, }, }, - }) + }, interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) expectedClosure := admin.ExecutionClosure{ @@ -355,6 +360,67 @@ func TestUpdateModelState_RunningToSuccess(t *testing.T) { expectedModel.Closure = closureBytes assert.EqualValues(t, expectedModel, executionModel) }) + t.Run("output data offloaded", func(t *testing.T) { + outputData := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 4, + }, + }, + }, + }, + }, + }, + }, + } + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error { + assert.Equal(t, reference.String(), "s3://bucket/metadata/project/domain/name/offloaded_outputs") + return nil + } + err := UpdateExecutionModelState(context.TODO(), &executionModel, admin.WorkflowExecutionEventRequest{ + Event: &event.WorkflowExecutionEvent{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Phase: core.WorkflowExecution_SUCCEEDED, + OccurredAt: occurredAtProto, + OutputResult: &event.WorkflowExecutionEvent_OutputData{ + OutputData: outputData, + }, + }, + }, interfaces.InlineEventDataPolicyOffload, mockStorage) + assert.Nil(t, err) + + expectedClosure := admin.ExecutionClosure{ + ComputedInputs: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": {}, + }, + }, + Phase: core.WorkflowExecution_SUCCEEDED, + StartedAt: startedAtProto, + UpdatedAt: occurredAtProto, + Duration: durationProto, + OutputResult: &admin.ExecutionClosure_Outputs{ + Outputs: &admin.LiteralMapBlob{ + Data: &admin.LiteralMapBlob_Uri{ + Uri: "s3://bucket/metadata/project/domain/name/offloaded_outputs", + }, + }, + }, + } + closureBytes, _ := proto.Marshal(&expectedClosure) + expectedModel.Closure = closureBytes + assert.EqualValues(t, expectedModel, executionModel) + }) } func TestSetExecutionAborted(t *testing.T) { diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index 125edad293..ade1f91911 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -3,6 +3,9 @@ package transformers import ( "context" + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flyteadmin/pkg/common" @@ -24,6 +27,8 @@ type ToNodeExecutionModelInput struct { ParentTaskExecutionID uint ParentID *uint DynamicWorkflowRemoteClosure string + InlineEventDataPolicy interfaces.InlineEventDataPolicy + StorageClient *storage.DataStore } func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution, @@ -44,8 +49,9 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution } func addTerminalState( + ctx context.Context, request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution, - closure *admin.NodeExecutionClosure) error { + closure *admin.NodeExecutionClosure, inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error { if closure.StartedAt == nil { logger.Warning(context.Background(), "node execution is missing StartedAt") } else { @@ -64,8 +70,22 @@ func addTerminalState( OutputUri: request.Event.GetOutputUri(), } } else if request.Event.GetOutputData() != nil { - closure.OutputResult = &admin.NodeExecutionClosure_OutputData{ - OutputData: request.Event.GetOutputData(), + switch inlineEventDataPolicy { + case interfaces.InlineEventDataPolicyStoreInline: + closure.OutputResult = &admin.NodeExecutionClosure_OutputData{ + OutputData: request.Event.GetOutputData(), + } + default: + logger.Debugf(ctx, "Offloading outputs per InlineEventDataPolicy") + uri, err := common.OffloadLiteralMap(ctx, storageClient, request.Event.GetOutputData(), + request.Event.Id.ExecutionId.Project, request.Event.Id.ExecutionId.Domain, request.Event.Id.ExecutionId.Name, + request.Event.Id.NodeId, OutputsObjectSuffix) + if err != nil { + return err + } + closure.OutputResult = &admin.NodeExecutionClosure_OutputUri{ + OutputUri: uri.String(), + } } } else if request.Event.GetError() != nil { closure.OutputResult = &admin.NodeExecutionClosure_Error{ @@ -78,7 +98,7 @@ func addTerminalState( return nil } -func CreateNodeExecutionModel(input ToNodeExecutionModelInput) (*models.NodeExecution, error) { +func CreateNodeExecutionModel(ctx context.Context, input ToNodeExecutionModelInput) (*models.NodeExecution, error) { nodeExecution := &models.NodeExecution{ NodeExecutionKey: models.NodeExecutionKey{ NodeID: input.Request.Event.Id.NodeId, @@ -110,7 +130,7 @@ func CreateNodeExecutionModel(input ToNodeExecutionModelInput) (*models.NodeExec } } if common.IsNodeExecutionTerminal(input.Request.Event.Phase) { - err := addTerminalState(input.Request, nodeExecution, &closure) + err := addTerminalState(ctx, input.Request, nodeExecution, &closure, input.InlineEventDataPolicy, input.StorageClient) if err != nil { return nil, err } @@ -142,8 +162,9 @@ func CreateNodeExecutionModel(input ToNodeExecutionModelInput) (*models.NodeExec } func UpdateNodeExecutionModel( - request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution, - targetExecution *core.WorkflowExecutionIdentifier, dynamicWorkflowRemoteClosure string) error { + ctx context.Context, request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution, + targetExecution *core.WorkflowExecutionIdentifier, dynamicWorkflowRemoteClosure string, + inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error { var nodeExecutionClosure admin.NodeExecutionClosure err := proto.Unmarshal(nodeExecutionModel.Closure, &nodeExecutionClosure) if err != nil { @@ -161,7 +182,7 @@ func UpdateNodeExecutionModel( } } if common.IsNodeExecutionTerminal(request.Event.Phase) { - err := addTerminalState(request, nodeExecutionModel, &nodeExecutionClosure) + err := addTerminalState(ctx, request, nodeExecutionModel, &nodeExecutionClosure, inlineEventDataPolicy, storageClient) if err != nil { return err } diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index 1f490ba3c1..75d00ab77c 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -1,9 +1,14 @@ package transformers import ( + "context" "testing" "time" + commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks" + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/ptypes" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -78,7 +83,8 @@ func TestAddTerminalState_OutputURI(t *testing.T) { closure := admin.NodeExecutionClosure{ StartedAt: startedAtProto, } - err := addTerminalState(&request, &nodeExecutionModel, &closure) + err := addTerminalState(context.TODO(), &request, &nodeExecutionModel, &closure, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) assert.EqualValues(t, outputURI, closure.GetOutputUri()) assert.Equal(t, time.Minute, nodeExecutionModel.Duration) @@ -104,6 +110,14 @@ func TestAddTerminalState_OutputData(t *testing.T) { } request := admin.NodeExecutionEventRequest{ Event: &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, Phase: core.NodeExecution_SUCCEEDED, OutputResult: &event.NodeExecutionEvent_OutputData{ OutputData: outputData, @@ -119,10 +133,25 @@ func TestAddTerminalState_OutputData(t *testing.T) { closure := admin.NodeExecutionClosure{ StartedAt: startedAtProto, } - err := addTerminalState(&request, &nodeExecutionModel, &closure) - assert.Nil(t, err) - assert.EqualValues(t, outputData, closure.GetOutputData()) - assert.Equal(t, time.Minute, nodeExecutionModel.Duration) + t.Run("output data stored inline", func(t *testing.T) { + err := addTerminalState(context.TODO(), &request, &nodeExecutionModel, &closure, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + assert.EqualValues(t, outputData, closure.GetOutputData()) + assert.Equal(t, time.Minute, nodeExecutionModel.Duration) + }) + t.Run("output data stored offloaded", func(t *testing.T) { + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error { + assert.Equal(t, reference.String(), "s3://bucket/metadata/project/domain/name/node id/offloaded_outputs") + return nil + } + + err := addTerminalState(context.TODO(), &request, &nodeExecutionModel, &closure, + interfaces.InlineEventDataPolicyOffload, mockStorage) + assert.Nil(t, err) + assert.Equal(t, "s3://bucket/metadata/project/domain/name/node id/offloaded_outputs", closure.GetOutputUri()) + }) } func TestAddTerminalState_Error(t *testing.T) { @@ -146,14 +175,15 @@ func TestAddTerminalState_Error(t *testing.T) { closure := admin.NodeExecutionClosure{ StartedAt: startedAtProto, } - err := addTerminalState(&request, &nodeExecutionModel, &closure) + err := addTerminalState(context.TODO(), &request, &nodeExecutionModel, &closure, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) assert.True(t, proto.Equal(error, closure.GetError())) assert.Equal(t, time.Minute, nodeExecutionModel.Duration) } func TestCreateNodeExecutionModel(t *testing.T) { - nodeExecutionModel, err := CreateNodeExecutionModel(ToNodeExecutionModelInput{ + nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{ Request: &admin.NodeExecutionEventRequest{ Event: &event.NodeExecutionEvent{ Id: &core.NodeExecutionIdentifier{ @@ -224,7 +254,8 @@ func TestUpdateNodeExecutionModel(t *testing.T) { nodeExecutionModel := models.NodeExecution{ Phase: core.NodeExecution_UNDEFINED.String(), } - err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID, dynamicWorkflowClosureRef) + err := UpdateNodeExecutionModel(context.TODO(), &request, &nodeExecutionModel, childExecutionID, dynamicWorkflowClosureRef, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase) assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt) @@ -287,7 +318,8 @@ func TestUpdateNodeExecutionModel(t *testing.T) { nodeExecutionModel := models.NodeExecution{ Phase: core.NodeExecution_UNDEFINED.String(), } - err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID, dynamicWorkflowClosureRef) + err := UpdateNodeExecutionModel(context.TODO(), &request, &nodeExecutionModel, childExecutionID, dynamicWorkflowClosureRef, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase) assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt) diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 313aca0a14..c5d5583278 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -2,6 +2,10 @@ package transformers import ( "context" + "strconv" + + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flytestdlib/storage" "google.golang.org/protobuf/encoding/protojson" @@ -23,7 +27,9 @@ var empty _struct.Struct var jsonEmpty, _ = protojson.Marshal(&empty) type CreateTaskExecutionModelInput struct { - Request *admin.TaskExecutionEventRequest + Request *admin.TaskExecutionEventRequest + InlineEventDataPolicy interfaces.InlineEventDataPolicy + StorageClient *storage.DataStore } func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution, @@ -38,8 +44,10 @@ func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecution } func addTaskTerminalState( + ctx context.Context, request *admin.TaskExecutionEventRequest, - taskExecutionModel *models.TaskExecution, closure *admin.TaskExecutionClosure) error { + taskExecutionModel *models.TaskExecution, closure *admin.TaskExecutionClosure, + inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error { if taskExecutionModel.StartedAt == nil { logger.Warning(context.Background(), "task execution is missing StartedAt") } else { @@ -62,8 +70,24 @@ func addTaskTerminalState( OutputUri: request.Event.GetOutputUri(), } } else if request.Event.GetOutputData() != nil { - closure.OutputResult = &admin.TaskExecutionClosure_OutputData{ - OutputData: request.Event.GetOutputData(), + switch inlineEventDataPolicy { + case interfaces.InlineEventDataPolicyStoreInline: + closure.OutputResult = &admin.TaskExecutionClosure_OutputData{ + OutputData: request.Event.GetOutputData(), + } + default: + logger.Debugf(ctx, "Offloading outputs per InlineEventDataPolicy") + uri, err := common.OffloadLiteralMap(ctx, storageClient, request.Event.GetOutputData(), + request.Event.ParentNodeExecutionId.ExecutionId.Project, request.Event.ParentNodeExecutionId.ExecutionId.Domain, + request.Event.ParentNodeExecutionId.ExecutionId.Name, request.Event.ParentNodeExecutionId.NodeId, + request.Event.TaskId.Project, request.Event.TaskId.Domain, request.Event.TaskId.Name, request.Event.TaskId.Version, + strconv.FormatUint(uint64(request.Event.RetryAttempt), 10), OutputsObjectSuffix) + if err != nil { + return err + } + closure.OutputResult = &admin.TaskExecutionClosure_OutputUri{ + OutputUri: uri.String(), + } } } else if request.Event.GetError() != nil { closure.OutputResult = &admin.TaskExecutionClosure_Error{ @@ -73,7 +97,7 @@ func addTaskTerminalState( return nil } -func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.TaskExecution, error) { +func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionModelInput) (*models.TaskExecution, error) { taskExecution := &models.TaskExecution{ TaskExecutionKey: models.TaskExecutionKey{ TaskKey: models.TaskKey{ @@ -121,7 +145,7 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task } if common.IsTaskExecutionTerminal(input.Request.Event.Phase) { - err := addTaskTerminalState(input.Request, taskExecution, closure) + err := addTaskTerminalState(ctx, input.Request, taskExecution, closure, input.InlineEventDataPolicy, input.StorageClient) if err != nil { return nil, err } @@ -216,7 +240,8 @@ func mergeCustom(existing, latest *_struct.Struct) (*_struct.Struct, error) { return &response, nil } -func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution) error { +func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution, + inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error { var taskExecutionClosure admin.TaskExecutionClosure err := proto.Unmarshal(taskExecutionModel.Closure, &taskExecutionClosure) if err != nil { @@ -240,7 +265,7 @@ func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExec } if common.IsTaskExecutionTerminal(request.Event.Phase) { - err := addTaskTerminalState(request, taskExecutionModel, &taskExecutionClosure) + err := addTaskTerminalState(ctx, request, taskExecutionModel, &taskExecutionClosure, inlineEventDataPolicy, storageClient) if err != nil { return err } diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index 2b5ebccec0..5d5b982a40 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -1,11 +1,16 @@ package transformers import ( + "context" "encoding/json" "fmt" "testing" "time" + commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks" + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/jsonpb" "google.golang.org/protobuf/types/known/structpb" @@ -106,7 +111,8 @@ func TestAddTaskTerminalState_Error(t *testing.T) { closure := admin.TaskExecutionClosure{ StartedAt: startedAtProto, } - err := addTaskTerminalState(&request, &taskExecutionModel, &closure) + err := addTaskTerminalState(context.TODO(), &request, &taskExecutionModel, &closure, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) assert.True(t, proto.Equal(expectedErr, closure.GetError())) assert.Equal(t, time.Minute, taskExecutionModel.Duration) @@ -129,7 +135,8 @@ func TestAddTaskTerminalState_OutputURI(t *testing.T) { } closure := &admin.TaskExecutionClosure{} - err := addTaskTerminalState(&request, &taskExecutionModel, closure) + err := addTaskTerminalState(context.TODO(), &request, &taskExecutionModel, closure, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) duration, err := ptypes.Duration(closure.GetDuration()) @@ -161,6 +168,22 @@ func TestAddTaskTerminalState_OutputData(t *testing.T) { } request := admin.TaskExecutionEventRequest{ Event: &event.TaskExecutionEvent{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "project", + Domain: "domain", + Name: "name", + Version: "version", + }, + RetryAttempt: 1, + ParentNodeExecutionId: &core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "ex project", + Domain: "ex domain", + Name: "ex name", + }, + }, Phase: core.TaskExecution_SUCCEEDED, OutputResult: &event.TaskExecutionEvent_OutputData{ OutputData: outputData, @@ -173,21 +196,37 @@ func TestAddTaskTerminalState_OutputData(t *testing.T) { StartedAt: &startedAt, } - closure := &admin.TaskExecutionClosure{} - err := addTaskTerminalState(&request, &taskExecutionModel, closure) - assert.Nil(t, err) + t.Run("output data stored inline", func(t *testing.T) { + closure := &admin.TaskExecutionClosure{} + err := addTaskTerminalState(context.TODO(), &request, &taskExecutionModel, closure, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) - duration, err := ptypes.Duration(closure.GetDuration()) - assert.Nil(t, err) - assert.EqualValues(t, request.Event.OutputResult, closure.OutputResult) - assert.True(t, proto.Equal(outputData, closure.GetOutputData())) - assert.EqualValues(t, time.Minute, duration) + duration, err := ptypes.Duration(closure.GetDuration()) + assert.Nil(t, err) + assert.EqualValues(t, request.Event.OutputResult, closure.OutputResult) + assert.True(t, proto.Equal(outputData, closure.GetOutputData())) + assert.EqualValues(t, time.Minute, duration) + }) + t.Run("output data stored offloaded", func(t *testing.T) { + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error { + assert.Equal(t, reference.String(), "s3://bucket/metadata/ex project/ex domain/ex name/node id/project/domain/name/version/1/offloaded_outputs") + return nil + } + + closure := &admin.TaskExecutionClosure{} + err := addTaskTerminalState(context.TODO(), &request, &taskExecutionModel, closure, + interfaces.InlineEventDataPolicyOffload, mockStorage) + assert.Nil(t, err) + assert.Equal(t, "s3://bucket/metadata/ex project/ex domain/ex name/node id/project/domain/name/version/1/offloaded_outputs", closure.GetOutputUri()) + }) assert.Equal(t, time.Minute, taskExecutionModel.Duration) } func TestCreateTaskExecutionModelQueued(t *testing.T) { - taskExecutionModel, err := CreateTaskExecutionModel(CreateTaskExecutionModelInput{ + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ Request: &admin.TaskExecutionEventRequest{ Event: &event.TaskExecutionEvent{ TaskId: sampleTaskID, @@ -243,7 +282,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { } func TestCreateTaskExecutionModelRunning(t *testing.T) { - taskExecutionModel, err := CreateTaskExecutionModel(CreateTaskExecutionModelInput{ + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ Request: &admin.TaskExecutionEventRequest{ Event: &event.TaskExecutionEvent{ TaskId: sampleTaskID, @@ -403,7 +442,8 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }, } - err = UpdateTaskExecutionModel(failedEventRequest, &existingTaskExecution) + err = UpdateTaskExecutionModel(context.TODO(), failedEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) assert.Nil(t, err) expectedClosure := &admin.TaskExecutionClosure{ diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index aa1d919e53..80323e400d 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -64,8 +64,9 @@ var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.Schedule }, }) var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.RemoteDataConfig{ - Scheme: common.None, - MaxSizeInBytes: 2 * MB, + Scheme: common.None, + MaxSizeInBytes: 2 * MB, + InlineEventDataPolicy: interfaces.InlineEventDataPolicyOffload, SignedURL: interfaces.SignedURL{ Enabled: false, }, diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index eda02a9a1e..16ec45b503 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -311,7 +311,19 @@ type SignedURL struct { SigningPrincipal string `json:"signingPrincipal"` } -// This configuration handles all requests to get remote data such as execution inputs & outputs. +//go:generate enumer -type=InlineEventDataPolicy -trimprefix=InlineEventDataPolicy +type InlineEventDataPolicy = int + +const ( + // InlineEventDataPolicyOffload specifies that inline execution event data (e.g. outputs) should be offloaded to the + // configured cloud blob store. + InlineEventDataPolicyOffload InlineEventDataPolicy = iota + // InlineEventDataPolicyStoreInline specifies that inline execution event data should be saved inline with execution + // database entries. + InlineEventDataPolicyStoreInline +) + +// This configuration handles all requests to get and write remote data such as execution inputs & outputs. type RemoteDataConfig struct { // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' // scheme is used. @@ -321,6 +333,8 @@ type RemoteDataConfig struct { SignedURL SignedURL `json:"signedUrls"` // Specifies the max size in bytes for which execution data such as inputs and outputs will be populated in line. MaxSizeInBytes int64 `json:"maxSizeInBytes"` + // Specifies how inline execution event data should be saved in the backend + InlineEventDataPolicy InlineEventDataPolicy `json:"inlineEventDataPolicy" pflag:",Specifies how inline execution event data should be saved in the backend"` } // This section handles configuration for the workflow notifications pipeline.