Skip to content

Commit

Permalink
Cannot abort an already terminated execution (flyteorg#454)
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Jul 6, 2022
1 parent a96084f commit 2a216f5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,9 @@ func (m *ExecutionManager) TerminateExecution(
logger.Infof(ctx, "couldn't find execution [%+v] to save termination cause", request.Id)
return nil, err
}
if common.IsExecutionTerminal(core.WorkflowExecution_Phase(core.WorkflowExecution_Phase_value[executionModel.Phase])) {
return nil, errors.NewFlyteAdminError(codes.PermissionDenied, "Cannot abort an already terminate workflow execution")
}

err = transformers.SetExecutionAborting(&executionModel, request.Cause, getUser(ctx))
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,38 @@ func TestTerminateExecution_DatabaseError(t *testing.T) {
assert.EqualError(t, err, expectedError.Error())
}

func TestTerminateExecution_AlreadyTerminated(t *testing.T) {
var expectedError = errors.New("expected error")

mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.Anything).Return(expectedError)
mockExecutor.OnID().Return("customMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)

repository := repositoryMocks.NewMockRepository()
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{
Phase: core.WorkflowExecution_SUCCEEDED.String(),
}, nil
})
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
resp, err := execManager.TerminateExecution(context.Background(), admin.ExecutionTerminateRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Cause: "abort cause",
})

assert.Nil(t, resp)
s, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.PermissionDenied, s.Code())
}

func TestGetExecutionData(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC)
Expand Down

0 comments on commit 2a216f5

Please sign in to comment.