Skip to content

Commit

Permalink
Record aborting before making abort call (flyteorg#350)
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Feb 17, 2022
1 parent 6cf09d1 commit c34dbe4
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
22 changes: 11 additions & 11 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,17 @@ func (m *ExecutionManager) TerminateExecution(
return nil, err
}

err = transformers.SetExecutionAborting(&executionModel, request.Cause, getUser(ctx))
if err != nil {
logger.Debugf(ctx, "failed to add abort metadata for execution [%+v] with err: %v", request.Id, err)
return nil, err
}
err = m.db.ExecutionRepo().Update(ctx, executionModel)
if err != nil {
logger.Debugf(ctx, "failed to save abort cause for terminated execution: %+v with err: %v", request.Id, err)
return nil, err
}

err = workflowengine.GetRegistry().GetExecutor().Abort(ctx, workflowengineInterfaces.AbortData{
Namespace: common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.Id.Project, request.Id.Domain),
Expand All @@ -1515,17 +1526,6 @@ func (m *ExecutionManager) TerminateExecution(
m.systemMetrics.TerminateExecutionFailures.Inc()
return nil, err
}

err = transformers.SetExecutionAborted(&executionModel, request.Cause, getUser(ctx))
if err != nil {
logger.Debugf(ctx, "failed to add abort metadata for execution [%+v] with err: %v", request.Id, err)
return nil, err
}
err = m.db.ExecutionRepo().Update(ctx, executionModel)
if err != nil {
logger.Debugf(ctx, "failed to save abort cause for terminated execution: %+v with err: %v", request.Id, err)
return nil, err
}
return &admin.ExecutionTerminateResponse{}, nil
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2466,10 +2466,12 @@ func TestTerminateExecution_PropellerError(t *testing.T) {
workflowengine.GetRegistry().Register(&mockExecutor)
defer resetExecutor()

updateCalled := false
repository := repositoryMocks.NewMockRepository()
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(func(
context context.Context, execution models.Execution) error {
t.Fatal("update should not be called when propeller fails to terminate an execution")
updateCalled = true
assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase)
return nil
})
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand All @@ -2484,6 +2486,7 @@ func TestTerminateExecution_PropellerError(t *testing.T) {
})
assert.Nil(t, resp)
assert.EqualError(t, err, expectedError.Error())
assert.True(t, updateCalled)
}

func TestTerminateExecution_DatabaseError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st

// The execution abort metadata is recorded but the phase is not actually updated *until* the abort event is propagated
// by flytepropeller. The metadata is preemptively saved at the time of the abort.
func SetExecutionAborted(execution *models.Execution, cause, principal string) error {
func SetExecutionAborting(execution *models.Execution, cause, principal string) error {
var closure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &closure)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestSetExecutionAborted(t *testing.T) {
}
cause := "a snafoo occurred"
principal := "principal"
err := SetExecutionAborted(&existingModel, cause, principal)
err := SetExecutionAborting(&existingModel, cause, principal)
assert.NoError(t, err)
var actualClosure admin.ExecutionClosure
err = proto.Unmarshal(existingModel.Closure, &actualClosure)
Expand Down

0 comments on commit c34dbe4

Please sign in to comment.