Skip to content

Commit

Permalink
return 404 when remote reference is empty
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 23, 2024
1 parent d1a05b2 commit 1bc6882
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
10 changes: 6 additions & 4 deletions flyteadmin/pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,22 @@ func (w *WorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request ad
ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, w.db, request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get node execution with id [%+v] with err %v",
logger.Errorf(ctx, "failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, nil
return &admin.DynamicNodeWorkflowResponse{}, errors.NewFlyteAdminErrorf(codes.NotFound, "node does not contain dynamic workflow")
}

closure := &core.CompiledWorkflowClosure{}
err = w.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
location := nodeExecutionModel.DynamicWorkflowRemoteClosureReference
err = w.storageClient.ReadProtobuf(ctx, storage.DataReference(location), closure)
if err != nil {
logger.Errorf(ctx, "unable to read workflow_closure from location %s: %v", location, err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
"unable to read workflow_closure from location %s : %v", location, err)
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
Expand Down
10 changes: 6 additions & 4 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,14 @@ func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: nil}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.NotFound, st.Code())
assert.Equal(t, "node does not contain dynamic workflow", st.Message())
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
Expand Down Expand Up @@ -490,7 +492,7 @@ func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Equal(t, "unable to read workflow_closure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}

Expand Down

0 comments on commit 1bc6882

Please sign in to comment.