Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: flyteorg/flyte
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 52f6a76185bc1cf711d135ee24b6e791e76348fd
Choose a base ref
..
head repository: flyteorg/flyte
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: e45bd5ad53221783dcd453aef9bfef55ae7f0d0e
Choose a head ref
64 changes: 64 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

@@ -370,6 +371,69 @@ func TestGetWorkflow_TransformerError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(adminErrors.FlyteAdminError).Code())
}

func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
expectedClosure := testutils.GetWorkflowClosure().CompiledWorkflow
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
bytes, err := proto.Marshal(expectedClosure)
require.NoError(t, err)
return proto.Unmarshal(bytes, msg)
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{
CompiledWorkflow: expectedClosure,
}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: ""}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: nil}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func TestListWorkflows(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
workflowListFunc := func(input interfaces.ListResourceInput) (interfaces.WorkflowCollectionOutput, error) {
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/mocks/workflow.go
Original file line number Diff line number Diff line change
@@ -48,3 +48,7 @@ func (r *MockWorkflowManager) ListWorkflowIdentifiers(ctx context.Context, reque
*admin.NamedEntityIdentifierList, error) {
return nil, nil
}

func (r *MockWorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
return nil, nil
}
20 changes: 11 additions & 9 deletions flyteadmin/pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
@@ -92,10 +92,11 @@ type taskExecutionEndpointMetrics struct {
type workflowEndpointMetrics struct {
scope promutils.Scope

create util.RequestMetrics
get util.RequestMetrics
list util.RequestMetrics
listIds util.RequestMetrics
create util.RequestMetrics
get util.RequestMetrics
list util.RequestMetrics
listIds util.RequestMetrics
getDynamicNodeWorkflow util.RequestMetrics
}

type descriptionEntityEndpointMetrics struct {
@@ -212,11 +213,12 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics {
list: util.NewRequestMetrics(adminScope, "list_task_execution"),
},
workflowEndpointMetrics: workflowEndpointMetrics{
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_workflow"),
get: util.NewRequestMetrics(adminScope, "get_workflow"),
list: util.NewRequestMetrics(adminScope, "list_workflow"),
listIds: util.NewRequestMetrics(adminScope, "list_workflow_ids"),
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_workflow"),
get: util.NewRequestMetrics(adminScope, "get_workflow"),
list: util.NewRequestMetrics(adminScope, "list_workflow"),
listIds: util.NewRequestMetrics(adminScope, "list_workflow_ids"),
getDynamicNodeWorkflow: util.NewRequestMetrics(adminScope, "get_dynamic_node_workflow"),
},

descriptionEntityMetrics: descriptionEntityEndpointMetrics{
5 changes: 2 additions & 3 deletions flyteadmin/pkg/rpc/adminservice/workflow.go
Original file line number Diff line number Diff line change
@@ -62,14 +62,13 @@ func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admi

var response *admin.DynamicNodeWorkflowResponse
var err error
// TODO create separate prometheus metric for this endpoint
m.Metrics.workflowEndpointMetrics.get.Time(func() {
m.Metrics.workflowEndpointMetrics.getDynamicNodeWorkflow.Time(func() {
response, err = m.WorkflowManager.GetDynamicNodeWorkflow(ctx, *request)
})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get)
}
m.Metrics.workflowEndpointMetrics.get.Success()
m.Metrics.workflowEndpointMetrics.getDynamicNodeWorkflow.Success()
return response, nil
}