Skip to content

Commit

Permalink
Truncate execution error message in list execution calls (flyteorg#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Feb 16, 2023
1 parent 4b712bd commit 96a660e
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 35 deletions.
16 changes: 8 additions & 8 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
}
sourceExecutionID = sourceExecutionModel.ID
requestSpec.Metadata.Principal = sourceExecutionModel.User
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel)
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
Expand Down Expand Up @@ -951,7 +951,7 @@ func (m *ExecutionManager) RelaunchExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel)
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1008,7 +1008,7 @@ func (m *ExecutionManager) RecoverExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel)
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
return
}
// Find the reference launch plan to get the kickoff time argument
execution, err := transformers.FromExecutionModel(*executionModel)
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(context.Background(),
"failed to transform execution model when emitting scheduled workflow execution stats with for "+
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (m *ExecutionManager) GetExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, transformerErr := transformers.FromExecutionModel(*executionModel)
execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
if transformerErr != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id,
transformerErr)
Expand Down Expand Up @@ -1345,7 +1345,7 @@ func (m *ExecutionManager) GetExecutionData(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, err := transformers.FromExecutionModel(*executionModel)
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions)
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (m *ExecutionManager) ListExecutions(
func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest,
execution models.Execution) error {
// Notifications are stored in the Spec object of an admin.Execution object.
adminExecution, err := transformers.FromExecutionModel(execution)
adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions)
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
Expand Down
10 changes: 5 additions & 5 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
// Handles making additional database calls, if necessary, to populate IsParent & IsDynamic data using the historical pattern of
// preloading child node executions. Otherwise, simply calls transform on the input model.
func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, nodeExecutionModel models.NodeExecution,
nodeExecutionID *core.NodeExecutionIdentifier) (*admin.NodeExecution, error) {
nodeExecutionID *core.NodeExecutionIdentifier, opts *transformers.ExecutionTransformerOptions) (*admin.NodeExecution, error) {
internalData, err := transformers.GetNodeExecutionInternalData(nodeExecutionModel.InternalData)
if err != nil {
return nil, err
Expand All @@ -323,7 +323,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context,
}
}

nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel)
nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel, opts)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution model [%+v] to proto with err: %v", nodeExecutionID, err)
return nil, err
Expand All @@ -341,7 +341,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Conte
Name: nodeExecutionModel.Name,
},
NodeId: nodeExecutionModel.NodeID,
})
}, transformers.ListExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func (m *NodeExecutionManager) GetNodeExecution(
request.Id, err)
return nil, err
}
nodeExecution, err := m.transformNodeExecutionModel(ctx, *nodeExecutionModel, request.Id)
nodeExecution, err := m.transformNodeExecutionModel(ctx, *nodeExecutionModel, request.Id, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}

nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel)
nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution model [%+v] when fetching data: %v", request.Id, err)
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions flyteadmin/pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"

"github.com/flyteorg/flyteadmin/pkg/manager/impl/util"

genModel "github.com/flyteorg/flyteadmin/pkg/repositories/gen/models"
Expand Down Expand Up @@ -444,7 +446,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
manager := NodeExecutionManager{
db: repository,
}
nodeExecution, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID)
nodeExecution, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.NoError(t, err)
assert.True(t, proto.Equal(nodeExecID, nodeExecution.Id))
assert.True(t, nodeExecution.Metadata.IsParentNode)
Expand Down Expand Up @@ -474,7 +476,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
Closure: closureBytes,
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InternalData: internalDataBytes,
}, nodeExecID)
}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.NoError(t, err)
assert.True(t, nodeExecution.Metadata.IsParentNode)
assert.True(t, nodeExecution.Metadata.IsDynamic)
Expand All @@ -485,7 +487,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
}
_, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{
InternalData: []byte("i'm invalid"),
}, nodeExecID)
}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.NotNil(t, err)
assert.Equal(t, err.(flyteAdminErrors.FlyteAdminError).Code(), codes.Internal)
})
Expand All @@ -500,7 +502,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
manager := NodeExecutionManager{
db: repository,
}
_, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID)
_, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.Equal(t, err, expectedErr)
})
}
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *TaskExecutionManager) GetTaskExecution(
if err != nil {
return nil, err
}
taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel)
taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform task execution model [%+v] to proto: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -284,7 +284,7 @@ func (m *TaskExecutionManager) ListTaskExecutions(
return nil, err
}

taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions)
taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform task execution models for request [%+v] with err: %v", request, err)
return nil, err
Expand Down
24 changes: 21 additions & 3 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

const trimmedErrMessageLen = 100

var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String())

// CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel.
Expand All @@ -44,6 +46,15 @@ type CreateExecutionModelInput struct {
LaunchEntity core.ResourceType
}

type ExecutionTransformerOptions struct {
TrimErrorMessage bool
}

var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{}
var ListExecutionTransformerOptions = &ExecutionTransformerOptions{
TrimErrorMessage: true,
}

// CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model
func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) {
requestSpec := input.RequestSpec
Expand Down Expand Up @@ -305,7 +316,7 @@ func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecu
}
}

func FromExecutionModel(executionModel models.Execution) (*admin.Execution, error) {
func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) {
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
Expand All @@ -315,6 +326,13 @@ func FromExecutionModel(executionModel models.Execution) (*admin.Execution, erro
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
closure.OutputResult = &admin.ExecutionClosure_Error{
Error: trimmedErrOutputResult,
}
}

if closure.StateChangeDetails == nil {
// Update execution state details from model for older executions
Expand Down Expand Up @@ -362,10 +380,10 @@ func PopulateDefaultStateChangeDetails(executionModel models.Execution) (*admin.
}, nil
}

func FromExecutionModels(executionModels []models.Execution) ([]*admin.Execution, error) {
func FromExecutionModels(executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) {
executions := make([]*admin.Execution, len(executionModels))
for idx, executionModel := range executionModels {
execution, err := FromExecutionModel(executionModel)
execution, err := FromExecutionModel(executionModel, opts)
if err != nil {
return nil, err
}
Expand Down
38 changes: 34 additions & 4 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestFromExecutionModel(t *testing.T) {
StartedAt: &startedAt,
State: &stateInt,
}
execution, err := FromExecutionModel(executionModel)
execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Expand Down Expand Up @@ -556,19 +556,49 @@ func TestFromExecutionModel_Aborted(t *testing.T) {
AbortCause: abortCause,
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel)
execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Equal(t, core.WorkflowExecution_ABORTED, execution.Closure.Phase)
assert.True(t, proto.Equal(&admin.AbortMetadata{
Cause: abortCause,
}, execution.Closure.GetAbortMetadata()))

executionModel.Phase = core.WorkflowExecution_RUNNING.String()
execution, err = FromExecutionModel(executionModel)
execution, err = FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Empty(t, execution.Closure.GetAbortCause())
}

func TestFromExecutionModel_Error(t *testing.T) {
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Message: extraLongErrMsg,
Kind: core.ExecutionError_USER,
}
executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_FAILED,
OutputResult: &admin.ExecutionClosure_Error{Error: execErr},
})
executionModel := models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
Phase: core.WorkflowExecution_FAILED.String(),
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
})
expectedExecErr := execErr
expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen))
assert.Nil(t, err)
assert.Equal(t, core.WorkflowExecution_FAILED, execution.Closure.Phase)
assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError()))
}

func TestFromExecutionModels(t *testing.T) {
spec := testutils.GetExecutionRequest().Spec
specBytes, _ := proto.Marshal(spec)
Expand Down Expand Up @@ -611,7 +641,7 @@ func TestFromExecutionModels(t *testing.T) {
State: &stateInt,
},
}
executions, err := FromExecutionModels(executionModels)
executions, err := FromExecutionModels(executionModels, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Len(t, executions, 1)
assert.True(t, proto.Equal(&admin.Execution{
Expand Down
9 changes: 8 additions & 1 deletion flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,19 @@ func UpdateNodeExecutionModel(
return nil
}

func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution) (*admin.NodeExecution, error) {
func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *ExecutionTransformerOptions) (*admin.NodeExecution, error) {
var closure admin.NodeExecutionClosure
err := proto.Unmarshal(nodeExecutionModel.Closure, &closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
closure.OutputResult = &admin.NodeExecutionClosure_Error{
Error: trimmedErrOutputResult,
}
}

var nodeExecutionMetadata admin.NodeExecutionMetaData
err = proto.Unmarshal(nodeExecutionModel.NodeExecutionMetadata, &nodeExecutionMetadata)
Expand Down
39 changes: 36 additions & 3 deletions flyteadmin/pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestFromNodeExecutionModel(t *testing.T) {
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InputURI: "input uri",
Duration: duration,
})
}, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
Expand All @@ -500,6 +500,39 @@ func TestFromNodeExecutionModel(t *testing.T) {
}, nodeExecution))
}

func TestFromNodeExecutionModel_Error(t *testing.T) {
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Message: extraLongErrMsg,
Kind: core.ExecutionError_USER,
}
executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_FAILED,
OutputResult: &admin.ExecutionClosure_Error{Error: execErr},
})
nodeExecution, err := FromNodeExecutionModel(models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "nodey",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
Closure: executionClosureBytes,
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InputURI: "input uri",
Duration: duration,
}, &ExecutionTransformerOptions{TrimErrorMessage: true})
assert.Nil(t, err)

expectedExecErr := execErr
expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen))
assert.Nil(t, err)
assert.True(t, proto.Equal(expectedExecErr, nodeExecution.Closure.GetError()))
}

func TestFromNodeExecutionModelWithChildren(t *testing.T) {
nodeExecutionIdentifier := core.NodeExecutionIdentifier{
NodeId: "nodey",
Expand Down Expand Up @@ -536,7 +569,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) {
}
t.Run("dynamic workflow", func(t *testing.T) {
nodeExecModel.DynamicWorkflowRemoteClosureReference = "dummy_dynamic_worklfow_ref"
nodeExecution, err := FromNodeExecutionModel(nodeExecModel)
nodeExecution, err := FromNodeExecutionModel(nodeExecModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
Expand All @@ -552,7 +585,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) {
})
t.Run("non dynamic workflow", func(t *testing.T) {
nodeExecModel.DynamicWorkflowRemoteClosureReference = ""
nodeExecution, err := FromNodeExecutionModel(nodeExecModel)
nodeExecution, err := FromNodeExecutionModel(nodeExecModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
Expand Down
Loading

0 comments on commit 96a660e

Please sign in to comment.