Skip to content

Commit

Permalink
Add count to execution repo interfaces (flyteorg#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewwdye authored Aug 26, 2022
1 parent bd3950c commit 520a35c
Show file tree
Hide file tree
Showing 25 changed files with 427 additions and 150 deletions.
3 changes: 2 additions & 1 deletion flyteadmin/auth/init_secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ var (
// GetInitSecretsCommand creates a command to issue secrets to be used for Auth settings. It writes the secrets to the
// working directory. The expectation is that they are put in a location and made available to the serve command later.
// To configure where the serve command looks for secrets, update this config:
// secrets:
//
// secrets:
// secrets-prefix: <my custom path>
func GetInitSecretsCommand() *cobra.Command {
cmd := &cobra.Command{
Expand Down
13 changes: 7 additions & 6 deletions flyteadmin/pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,10 @@ func prepareDynamicCreate(target executioncluster.ExecutionTarget, config string

// This function loops through the kubernetes resource template files in the configured template directory.
// For each unapplied template file (wrt the namespace) this func attempts to
// 1) create k8s object resource from template by performing:
// a) read template file
// b) substitute templatized variables with their resolved values
// 2) create the resource on the kubernetes cluster and cache successful outcomes
// 1. create k8s object resource from template by performing:
// a) read template file
// b) substitute templatized variables with their resolved values
// 2. create the resource on the kubernetes cluster and cache successful outcomes
func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, namespace NamespaceName,
templateValues, customTemplateValues templateValuesType) error {
templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath()
Expand Down Expand Up @@ -445,8 +445,9 @@ func addResourceVersion(patch []byte, rv string) ([]byte, error) {
}

// createResourceFromTemplate this method perform following processes:
// 1) read template file pointed by templateDir and templateFileName
// 2) substitute templatized variables with their resolved values
// 1. read template file pointed by templateDir and templateFileName
// 2. substitute templatized variables with their resolved values
//
// the method will return the kubernetes raw manifest
func (c *controller) createResourceFromTemplate(ctx context.Context, templateDir string,
templateFileName string, project *admin.Project, domain *admin.Domain, namespace NamespaceName,
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func (e *flyteAdminErrorImpl) String() string {
}

// enclose the error in the format that grpc server expect from golang:
// https://github.com/grpc/grpc-go/blob/master/status/status.go#L133
//
// https://github.com/grpc/grpc-go/blob/master/status/status.go#L133
func (e *flyteAdminErrorImpl) WithDetails(details *admin.EventFailureReason) (FlyteAdminError, error) {
s, err := e.status.WithDetails(details)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,7 @@ func TestUpdateExecution(t *testing.T) {
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand All @@ -2315,7 +2315,7 @@ func TestUpdateExecution(t *testing.T) {
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand All @@ -2333,7 +2333,7 @@ func TestUpdateExecution(t *testing.T) {
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
return fmt.Errorf("some db error")
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand Down Expand Up @@ -2818,7 +2818,7 @@ func TestTerminateExecution(t *testing.T) {
}, unmarshaledClosure.GetAbortMetadata()))
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecutionFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)

mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.AbortData) bool {
Expand Down Expand Up @@ -2860,7 +2860,7 @@ func TestTerminateExecution_PropellerError(t *testing.T) {

updateCalled := false
repository := repositoryMocks.NewMockRepository()
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(func(
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(func(
context context.Context, execution models.Execution) error {
updateCalled = true
assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase)
Expand Down Expand Up @@ -2892,7 +2892,7 @@ func TestTerminateExecution_DatabaseError(t *testing.T) {
context context.Context, execution models.Execution) error {
return expectedError
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecutionFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.Anything).Return(nil)
mockExecutor.OnID().Return("testMockExecutor")
Expand Down
10 changes: 5 additions & 5 deletions flyteadmin/pkg/manager/impl/executions/quality_of_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func (q qualityOfServiceAllocator) getQualityOfServiceFromDb(ctx context.Context
/*
Users can specify the quality of service for an execution (in order of decreasing specificity)
- At CreateExecution request time
- In the LaunchPlan spec
- In the Workflow spec
- As an overridable MatchableResource (https://lyft.github.io/flyte/administrator/install/managing_customizable_resources.html)
for the underlying workflow
- At CreateExecution request time
- In the LaunchPlan spec
- In the Workflow spec
- As an overridable MatchableResource (https://lyft.github.io/flyte/administrator/install/managing_customizable_resources.html)
for the underlying workflow
System administrators can specify default QualityOfService specs
(https://github.com/flyteorg/flyteidl/blob/e9727afcedf8d4c30a1fc2eeac45593e426d9bb0/protos/flyteidl/core/execution.proto#L92)s
Expand Down
171 changes: 87 additions & 84 deletions flyteadmin/pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
ExecutionId: &workflowExecutionIdentifier,
}
t.Run("event version 0", func(t *testing.T) {
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction =
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.True(t, proto.Equal(nodeExecID, &input.NodeExecutionIdentifier))
return models.NodeExecution{
Expand All @@ -432,7 +432,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
},
},
}, nil
}
})

manager := NodeExecutionManager{
db: repository,
Expand Down Expand Up @@ -484,11 +484,11 @@ func TestTransformNodeExecutionModel(t *testing.T) {
})
t.Run("get with children err", func(t *testing.T) {
expectedErr := flyteAdminErrors.NewFlyteAdminError(codes.Internal, "foo")
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction =
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.True(t, proto.Equal(nodeExecID, &input.NodeExecutionIdentifier))
return models.NodeExecution{}, expectedErr
}
})

manager := NodeExecutionManager{
db: repository,
Expand All @@ -501,7 +501,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
func TestTransformNodeExecutionModelList(t *testing.T) {
ctx := context.TODO()
repository := repositoryMocks.NewMockRepository()
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction =
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
Expand All @@ -522,7 +522,7 @@ func TestTransformNodeExecutionModelList(t *testing.T) {
},
},
}, nil
}
})

manager := NodeExecutionManager{
db: repository,
Expand Down Expand Up @@ -600,45 +600,46 @@ func TestGetNodeExecutionParentNode(t *testing.T) {
}
metadataBytes, _ := proto.Marshal(&expectedMetadata)
closureBytes, _ := proto.Marshal(&expectedClosure)
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
ChildNodeExecutions: []models.NodeExecution{
{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node-child",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
ChildNodeExecutions: []models.NodeExecution{
{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node-child",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
},
},
}, nil
}
}, nil
})
nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), make([]string, 0), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil, nil, &eventWriterMocks.NodeExecutionEventWriter{})
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Id: &nodeExecutionIdentifier,
Expand All @@ -664,33 +665,34 @@ func TestGetNodeExecutionEventVersion0(t *testing.T) {
}
metadataBytes, _ := proto.Marshal(&expectedMetadata)
closureBytes, _ := proto.Marshal(&expectedClosure)
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
}
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
})

nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), make([]string, 0), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil, nil, &eventWriterMocks.NodeExecutionEventWriter{})
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Expand Down Expand Up @@ -809,24 +811,25 @@ func TestListNodeExecutionsLevelZero(t *testing.T) {
},
}, nil
})
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
}
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
})
nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), make([]string, 0), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil, nil, &eventWriterMocks.NodeExecutionEventWriter{})
nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Expand Down
18 changes: 10 additions & 8 deletions flyteadmin/pkg/manager/impl/task_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,21 +304,23 @@ func TestCreateTaskEvent_MissingExecution(t *testing.T) {
func(ctx context.Context, input interfaces.GetTaskExecutionInput) (models.TaskExecution, error) {
return models.TaskExecution{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "foo")
})
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).ExistsFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, expectedErr
}
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetExistsCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, expectedErr
})
taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, "Failed to get existing node execution id: [node_id:\"node-id\""+
" execution_id:<project:\"project\" domain:\"domain\" name:\"name\" > ] "+
"with err: expected error")
assert.Nil(t, resp)

repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).ExistsFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, nil
}
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetExistsCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, nil
})
taskExecManager = NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil)
resp, err = taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, "failed to get existing node execution id: [node_id:\"node-id\""+
Expand Down
4 changes: 3 additions & 1 deletion flyteadmin/pkg/repositories/errors/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// This errors utility translates postgres application error codes into internal error types.
// The go postgres driver defines possible error codes here: https://github.com/lib/pq/blob/master/error.go
// And the postgres standard defines error responses here:
// https://www.postgresql.org/docs/current/static/protocol-error-fields.html
//
// https://www.postgresql.org/docs/current/static/protocol-error-fields.html
//
// Inspired by https://www.codementor.io/tamizhvendan/managing-data-in-golang-using-gorm-part-1-a9cdjb8nb
package errors

Expand Down
Loading

0 comments on commit 520a35c

Please sign in to comment.