Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 24, 2024
1 parent 38f5b9e commit c3f0685
Show file tree
Hide file tree
Showing 20 changed files with 2,843 additions and 2,842 deletions.
43 changes: 23 additions & 20 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,9 @@ func (m *NodeExecutionManager) GetDynamicNodeWorkflow(ctx context.Context, reque
return &admin.DynamicNodeWorkflowResponse{}, errors.NewFlyteAdminErrorf(codes.NotFound, "node does not contain dynamic workflow")
}

closure := &core.CompiledWorkflowClosure{}
location := nodeExecutionModel.DynamicWorkflowRemoteClosureReference
err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(location), closure)
closure, err := m.fetchDynamicWorkflowClosure(ctx, nodeExecutionModel.DynamicWorkflowRemoteClosureReference)
if err != nil {
logger.Errorf(ctx, "unable to read workflow_closure from location %s: %v", location, err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"unable to read workflow_closure from location %s : %v", location, err)
return nil, err
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
Expand Down Expand Up @@ -545,23 +541,15 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
closure := &core.CompiledWorkflowClosure{}
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
closure, err := m.fetchDynamicWorkflowClosure(ctx, nodeExecutionModel.DynamicWorkflowRemoteClosureReference)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
return nil, err

Check warning on line 546 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L546

Added line #L546 was not covered by tests
}

if wf := closure.Primary; wf == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow definition in loaded dynamic workflow model.")
} else if template := wf.Template; template == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow template in loaded dynamic workflow model.")
} else {
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
DynamicJobSpecUri: nodeExecution.Closure.DynamicJobSpecUri,
}
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
DynamicJobSpecUri: nodeExecution.Closure.DynamicJobSpecUri,
}
}

Expand All @@ -575,6 +563,21 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return response, nil
}

func (m *NodeExecutionManager) fetchDynamicWorkflowClosure(ctx context.Context, ref string) (*core.CompiledWorkflowClosure, error) {
closure := &core.CompiledWorkflowClosure{}
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(ref), closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Unable to read WorkflowClosure from location %s : %v", ref, err)
}

if wf := closure.Primary; wf == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow definition in loaded dynamic workflow model.")

Check warning on line 574 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L574

Added line #L574 was not covered by tests
} else if template := wf.Template; template == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow template in loaded dynamic workflow model.")
}

Check warning on line 577 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L576-L577

Added lines #L576 - L577 were not covered by tests
return closure, nil
}

func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,6 @@ func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "unable to read workflow_closure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Equal(t, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}
664 changes: 658 additions & 6 deletions flyteidl/gen/pb-cpp/flyteidl/admin/node_execution.pb.cc

Large diffs are not rendered by default.

Loading

0 comments on commit c3f0685

Please sign in to comment.