Skip to content

Commit

Permalink
Move endpoint to node_executions
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 23, 2024
1 parent 1bc6882 commit 39506c3
Show file tree
Hide file tree
Showing 32 changed files with 2,415 additions and 1,946 deletions.
29 changes: 29 additions & 0 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,35 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
return &admin.NodeExecutionEventResponse{}, nil
}

func (m *NodeExecutionManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Id)
if err != nil {
logger.Errorf(ctx, "failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, errors.NewFlyteAdminErrorf(codes.NotFound, "node does not contain dynamic workflow")
}

closure := &core.CompiledWorkflowClosure{}
location := nodeExecutionModel.DynamicWorkflowRemoteClosureReference
err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(location), closure)
if err != nil {
logger.Errorf(ctx, "unable to read workflow_closure from location %s: %v", location, err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"unable to read workflow_closure from location %s : %v", location, err)
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
}

// Handles making additional database calls, if necessary, to populate IsParent & IsDynamic data using the historical pattern of
// preloading child node executions. Otherwise, simply calls transform on the input model.
func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, nodeExecutionModel models.NodeExecution,
Expand Down
148 changes: 148 additions & 0 deletions flyteadmin/pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/status"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1319,3 +1322,148 @@ func TestGetNodeExecutionData(t *testing.T) {
},
}, dataResponse))
}

func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
expectedClosure := testutils.GetWorkflowClosure().CompiledWorkflow
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
bytes, err := proto.Marshal(expectedClosure)
require.NoError(t, err)
return proto.Unmarshal(bytes, msg)
}
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})
expected := &admin.DynamicNodeWorkflowResponse{
CompiledWorkflow: expectedClosure,
}

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_DBError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
expectedErr := errors.New("failure")
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{}, expectedErr
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.Equal(t, expectedErr, err)
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: ""}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.NotFound, st.Code())
assert.Equal(t, "node does not contain dynamic workflow", st.Message())
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
return errors.New("failure")
}
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "unable to read workflow_closure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}
29 changes: 0 additions & 29 deletions flyteadmin/pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,35 +361,6 @@ func (w *WorkflowManager) ListWorkflowIdentifiers(ctx context.Context, request a

}

func (w *WorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, w.db, request.Id)
if err != nil {
logger.Errorf(ctx, "failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, errors.NewFlyteAdminErrorf(codes.NotFound, "node does not contain dynamic workflow")
}

closure := &core.CompiledWorkflowClosure{}
location := nodeExecutionModel.DynamicWorkflowRemoteClosureReference
err = w.storageClient.ReadProtobuf(ctx, storage.DataReference(location), closure)
if err != nil {
logger.Errorf(ctx, "unable to read workflow_closure from location %s: %v", location, err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"unable to read workflow_closure from location %s : %v", location, err)
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
}

func NewWorkflowManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration,
Expand Down
126 changes: 0 additions & 126 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -371,131 +370,6 @@ func TestGetWorkflow_TransformerError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(adminErrors.FlyteAdminError).Code())
}

func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
expectedClosure := testutils.GetWorkflowClosure().CompiledWorkflow
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
bytes, err := proto.Marshal(expectedClosure)
require.NoError(t, err)
return proto.Unmarshal(bytes, msg)
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{
CompiledWorkflow: expectedClosure,
}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_DBError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
expectedErr := errors.New("failure")
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{}, expectedErr
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.Equal(t, expectedErr, err)
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: ""}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.NotFound, st.Code())
assert.Equal(t, "node does not contain dynamic workflow", st.Message())
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
return errors.New("failure")
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "unable to read workflow_closure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}

func TestListWorkflows(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
workflowListFunc := func(input interfaces.ListResourceInput) (interfaces.WorkflowCollectionOutput, error) {
Expand Down
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/interfaces/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ type NodeExecutionInterface interface {
ListNodeExecutionsForTask(ctx context.Context, request admin.NodeExecutionForTaskListRequest) (*admin.NodeExecutionList, error)
GetNodeExecutionData(
ctx context.Context, request admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error)
GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error)
}
1 change: 0 additions & 1 deletion flyteadmin/pkg/manager/interfaces/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ type WorkflowInterface interface {
ListWorkflows(ctx context.Context, request admin.ResourceListRequest) (*admin.WorkflowList, error)
ListWorkflowIdentifiers(ctx context.Context, request admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error)
GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error)
}
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/mocks/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ func (m *MockNodeExecutionManager) GetNodeExecutionData(
}
return nil, nil
}

func (m *MockNodeExecutionManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
return nil, nil
}
4 changes: 0 additions & 4 deletions flyteadmin/pkg/manager/mocks/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,3 @@ func (r *MockWorkflowManager) ListWorkflowIdentifiers(ctx context.Context, reque
*admin.NamedEntityIdentifierList, error) {
return nil, nil
}

func (r *MockWorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
return nil, nil
}
Loading

0 comments on commit 39506c3

Please sign in to comment.