Skip to content

Commit

Permalink
Inline execution event data can be offloaded to cloud blobstore (flyt…
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Nov 9, 2021
1 parent 7b9a974 commit c8a4162
Show file tree
Hide file tree
Showing 14 changed files with 390 additions and 76 deletions.
30 changes: 30 additions & 0 deletions pkg/common/data_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package common

import (
"context"

"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/shared"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/storage"
)

func OffloadLiteralMap(ctx context.Context, storageClient *storage.DataStore, literalMap *core.LiteralMap, nestedKeys ...string) (storage.DataReference, error) {
if literalMap == nil {
literalMap = &core.LiteralMap{}
}
nestedKeyReference := []string{
shared.Metadata,
}
nestedKeyReference = append(nestedKeyReference, nestedKeys...)
uri, err := storageClient.ConstructReference(ctx, storageClient.GetBaseContainerFQN(ctx), nestedKeyReference...)
if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to construct data reference for [%+v] with err: %v", nestedKeys, err)
}
if err := storageClient.WriteProtobuf(ctx, uri, storage.Options{}, literalMap); err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to write protobuf for [%+v] with err: %v", nestedKeys, err)
}
return uri, nil
}
65 changes: 65 additions & 0 deletions pkg/common/data_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package common

import (
"context"
"testing"

"github.com/flyteorg/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"

commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

var literalMap = &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_Integer{
Integer: 4,
},
},
},
},
},
},
},
}

func TestOffloadLiteralMap(t *testing.T) {
mockStorage := commonMocks.GetMockStorageClient()
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error {
assert.Equal(t, reference.String(), "s3://bucket/metadata/nested/key")
return nil
}

uri, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key")
assert.NoError(t, err)
assert.Equal(t, "s3://bucket/metadata/nested/key", uri.String())
}

func TestOffloadLiteralMap_ConstructReferenceError(t *testing.T) {
mockStorage := commonMocks.GetMockStorageClient()
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ConstructReferenceCb = func(
ctx context.Context, reference storage.DataReference, nestedKeys ...string) (storage.DataReference, error) {
return "foo", errors.NewFlyteAdminError(codes.Internal, "foo")
}
_, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key")
assert.Equal(t, err.(errors.FlyteAdminError).Code(), codes.Internal)
}

func TestOffloadLiteralMap_StorageFailure(t *testing.T) {
mockStorage := commonMocks.GetMockStorageClient()
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error {
assert.Equal(t, reference.String(), "s3://bucket/metadata/nested/key")
return errors.NewFlyteAdminError(codes.Internal, "foo")
}
_, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key")
assert.Equal(t, err.(errors.FlyteAdminError).Code(), codes.Internal)
}
26 changes: 6 additions & 20 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,6 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *
return nil, nil
}

func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.LiteralMap, identifier *core.WorkflowExecutionIdentifier, key string) (storage.DataReference, error) {
if literalMap == nil {
literalMap = &core.LiteralMap{}
}
inputsURI, err := m.storageClient.ConstructReference(ctx, m.storageClient.GetBaseContainerFQN(ctx), shared.Metadata, identifier.Project, identifier.Domain, identifier.Name, key)
if err != nil {
return "", err
}
if err := m.storageClient.WriteProtobuf(ctx, inputsURI, storage.Options{}, literalMap); err != nil {
return "", err
}
return inputsURI, nil
}

type completeTaskResources struct {
Defaults runtimeInterfaces.TaskResourceSet
Limits runtimeInterfaces.TaskResourceSet
Expand Down Expand Up @@ -569,11 +555,11 @@ func (m *ExecutionManager) launchSingleTaskExecution(
// Dynamically assign execution queues.
m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow)

inputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.Inputs)
inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs)
if err != nil {
return nil, nil, err
}
userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs)
userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -764,11 +750,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Dynamically assign execution queues.
m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow)

inputsURI, err := m.offloadInputs(ctx, executionInputs, &workflowExecutionID, shared.Inputs)
inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs)
if err != nil {
return nil, nil, err
}
userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs)
userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1178,7 +1164,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, errors.NewAlreadyInTerminalStateError(ctx, errorMsg, curPhase)
}

err = transformers.UpdateExecutionModelState(executionModel, request)
err = transformers.UpdateExecutionModelState(ctx, executionModel, request, m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, m.storageClient)
if err != nil {
logger.Debugf(ctx, "failed to transform updated workflow execution model [%+v] after receiving event with err: %v",
request.Event.ExecutionId, err)
Expand Down Expand Up @@ -1266,7 +1252,7 @@ func (m *ExecutionManager) GetExecutionData(
if err := proto.Unmarshal(executionModel.Closure, closure); err != nil {
return nil, err
}
newInputsURI, err := m.offloadInputs(ctx, closure.ComputedInputs, request.Id, shared.Inputs)
newInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, closure.ComputedInputs, request.Id.Project, request.Id.Domain, request.Id.Name, shared.Inputs)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ func (m *NodeExecutionManager) createNodeExecutionWithEvent(
}
parentID = &parentNodeExecutionModel.ID
}
nodeExecutionModel, err := transformers.CreateNodeExecutionModel(transformers.ToNodeExecutionModelInput{
nodeExecutionModel, err := transformers.CreateNodeExecutionModel(ctx, transformers.ToNodeExecutionModelInput{
Request: request,
ParentTaskExecutionID: parentTaskExecutionID,
ParentID: parentID,
DynamicWorkflowRemoteClosure: dynamicWorkflowRemoteClosureReference,
InlineEventDataPolicy: m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy,
StorageClient: m.storageClient,
})
if err != nil {
logger.Debugf(ctx, "failed to create node execution model for event request: %s with err: %v",
Expand Down Expand Up @@ -175,7 +177,9 @@ func (m *NodeExecutionManager) updateNodeExecutionWithEvent(
return updateFailed, err
}
}
err := transformers.UpdateNodeExecutionModel(request, nodeExecutionModel, childExecutionID, dynamicWorkflowRemoteClosureReference)
err := transformers.UpdateNodeExecutionModel(ctx, request, nodeExecutionModel, childExecutionID,
dynamicWorkflowRemoteClosureReference, m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy,
m.storageClient)
if err != nil {
logger.Debugf(ctx, "failed to update node execution model: %+v with err: %v", request.Event.Id, err)
return updateFailed, err
Expand Down
8 changes: 6 additions & 2 deletions pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ func (m *TaskExecutionManager) createTaskExecution(
}

taskExecutionModel, err := transformers.CreateTaskExecutionModel(
ctx,
transformers.CreateTaskExecutionModelInput{
Request: request,
Request: request,
InlineEventDataPolicy: m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy,
StorageClient: m.storageClient,
})
if err != nil {
logger.Debugf(ctx, "failed to transform task execution %+v into database model: %v", request.Event.TaskId, err)
Expand All @@ -104,7 +107,8 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState(
ctx context.Context, request *admin.TaskExecutionEventRequest, existingTaskExecution *models.TaskExecution) (
models.TaskExecution, error) {

err := transformers.UpdateTaskExecutionModel(request, existingTaskExecution)
err := transformers.UpdateTaskExecutionModel(ctx, request, existingTaskExecution,
m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, m.storageClient)
if err != nil {
logger.Debugf(ctx, "failed to update task execution model [%+v] with err: %v", request.Event.TaskId, err)
return models.TaskExecution{}, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/repositories/transformers/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package transformers

// OutputsObjectSuffix is used when execution event data includes inline outputs but the admin deployment is configured
// to offload such data. The generated file path for the offloaded data will include the execution identifier and this suffix.
const OutputsObjectSuffix = "offloaded_outputs"
27 changes: 24 additions & 3 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
Expand Down Expand Up @@ -104,7 +106,9 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e

// Updates an existing model given a WorkflowExecution event.
func UpdateExecutionModelState(
execution *models.Execution, request admin.WorkflowExecutionEventRequest) error {
ctx context.Context,
execution *models.Execution, request admin.WorkflowExecutionEventRequest,
inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error {
var executionClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &executionClosure)
if err != nil {
Expand Down Expand Up @@ -143,8 +147,25 @@ func UpdateExecutionModelState(
},
}
} else if request.Event.GetOutputData() != nil {
executionClosure.OutputResult = &admin.ExecutionClosure_OutputData{
OutputData: request.Event.GetOutputData(),
switch inlineEventDataPolicy {
case interfaces.InlineEventDataPolicyStoreInline:
executionClosure.OutputResult = &admin.ExecutionClosure_OutputData{
OutputData: request.Event.GetOutputData(),
}
default:
logger.Debugf(ctx, "Offloading outputs per InlineEventDataPolicy")
uri, err := common.OffloadLiteralMap(ctx, storageClient, request.Event.GetOutputData(),
request.Event.ExecutionId.Project, request.Event.ExecutionId.Domain, request.Event.ExecutionId.Name, OutputsObjectSuffix)
if err != nil {
return err
}
executionClosure.OutputResult = &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: uri.String(),
},
},
}
}
} else if request.Event.GetError() != nil {
executionClosure.OutputResult = &admin.ExecutionClosure_Error{
Expand Down
Loading

0 comments on commit c8a4162

Please sign in to comment.