Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
with context
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan committed May 22, 2023
1 parent 105acf2 commit 57dd027
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
16 changes: 8 additions & 8 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
}
sourceExecutionID = sourceExecutionModel.ID
requestSpec.Metadata.Principal = sourceExecutionModel.User
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
sourceExecution, err := transformers.FromExecutionModel(ctx, *sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
Expand Down Expand Up @@ -967,7 +967,7 @@ func (m *ExecutionManager) RelaunchExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1024,7 +1024,7 @@ func (m *ExecutionManager) RecoverExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1075,7 +1075,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
return
}
// Find the reference launch plan to get the kickoff time argument
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(context.Background(),
"failed to transform execution model when emitting scheduled workflow execution stats with for "+
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func (m *ExecutionManager) GetExecution(
}
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.GetId().GetProject(), request.GetId().GetDomain())
execution, transformerErr := transformers.FromExecutionModel(*executionModel, &transformers.ExecutionTransformerOptions{
execution, transformerErr := transformers.FromExecutionModel(ctx, *executionModel, &transformers.ExecutionTransformerOptions{
DefaultNamespace: namespace,
})
if transformerErr != nil {
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func (m *ExecutionManager) GetExecutionData(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1465,7 +1465,7 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)
executionList, err := transformers.FromExecutionModels(ctx, output.Executions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func (m *ExecutionManager) ListExecutions(
func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest,
execution models.Execution) error {
// Notifications are stored in the Spec object of an admin.Execution object.
adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions)
adminExecution, err := transformers.FromExecutionModel(ctx, execution, transformers.DefaultExecutionTransformerOptions)
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
Expand Down
8 changes: 4 additions & 4 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecu
}
}

func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) {
func FromExecutionModel(ctx context.Context, executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) {
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
Expand All @@ -333,7 +333,7 @@ func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransfor
spec.Metadata.SystemMetadata = &admin.SystemMetadata{}
}
if len(spec.GetMetadata().GetSystemMetadata().Namespace) == 0 {
logger.Infof(context.TODO(), "setting execution system metadata namespace to [%s]", opts.DefaultNamespace)
logger.Infof(ctx, "setting execution system metadata namespace to [%s]", opts.DefaultNamespace)
spec.Metadata.SystemMetadata.Namespace = opts.DefaultNamespace
}
}
Expand Down Expand Up @@ -398,10 +398,10 @@ func PopulateDefaultStateChangeDetails(executionModel models.Execution) (*admin.
}, nil
}

func FromExecutionModels(executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) {
func FromExecutionModels(ctx context.Context, executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) {
executions := make([]*admin.Execution, len(executionModels))
for idx, executionModel := range executionModels {
execution, err := FromExecutionModel(executionModel, opts)
execution, err := FromExecutionModel(ctx, executionModel, opts)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func TestFromExecutionModel(t *testing.T) {
StartedAt: &startedAt,
State: &stateInt,
}
execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
execution, err := FromExecutionModel(context.TODO(), executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Expand Down Expand Up @@ -559,15 +559,15 @@ func TestFromExecutionModel_Aborted(t *testing.T) {
AbortCause: abortCause,
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
execution, err := FromExecutionModel(context.TODO(), executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Equal(t, core.WorkflowExecution_ABORTED, execution.Closure.Phase)
assert.True(t, proto.Equal(&admin.AbortMetadata{
Cause: abortCause,
}, execution.Closure.GetAbortMetadata()))

executionModel.Phase = core.WorkflowExecution_RUNNING.String()
execution, err = FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
execution, err = FromExecutionModel(context.TODO(), executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Empty(t, execution.Closure.GetAbortCause())
}
Expand All @@ -592,7 +592,7 @@ func TestFromExecutionModel_Error(t *testing.T) {
Phase: core.WorkflowExecution_FAILED.String(),
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
execution, err := FromExecutionModel(context.TODO(), executionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
})
expectedExecErr := execErr
Expand All @@ -618,7 +618,7 @@ func TestFromExecutionModel_OverwriteNamespace(t *testing.T) {
Closure: executionClosureBytes,
}
overwrittenNamespace := "ns"
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
execution, err := FromExecutionModel(context.TODO(), executionModel, &ExecutionTransformerOptions{
DefaultNamespace: overwrittenNamespace,
})
assert.NoError(t, err)
Expand Down Expand Up @@ -667,7 +667,7 @@ func TestFromExecutionModels(t *testing.T) {
State: &stateInt,
},
}
executions, err := FromExecutionModels(executionModels, DefaultExecutionTransformerOptions)
executions, err := FromExecutionModels(context.TODO(), executionModels, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Len(t, executions, 1)
assert.True(t, proto.Equal(&admin.Execution{
Expand Down

0 comments on commit 57dd027

Please sign in to comment.