diff --git a/go.mod b/go.mod index 4af71d83bb..c1010ca168 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/benbjohnson/clock v1.1.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.21.15 + github.com/flyteorg/flyteidl v0.21.16 github.com/flyteorg/flyteplugins v0.7.1 github.com/flyteorg/flytepropeller v0.15.13 github.com/flyteorg/flytestdlib v0.4.7 diff --git a/go.sum b/go.sum index 87ee22d441..039b020a75 100644 --- a/go.sum +++ b/go.sum @@ -311,12 +311,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v0.21.4/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.21.13 h1:JM7eEokDO+ilXKqdW3CZa/mr6lxoVJ2ztfWZQEyJxEg= -github.com/flyteorg/flyteidl v0.21.13/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.21.14 h1:uOc3u0DFkFPag5fOQ8LMmMBBZqHy19ikxShnRlRAzZ0= -github.com/flyteorg/flyteidl v0.21.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.21.15 h1:XplSOL7Vl2dUriveXS27bnLhuNyAL+DR3sFexhFXrWE= -github.com/flyteorg/flyteidl v0.21.15/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.21.16 h1:MepQ5iNnYbYDlR5kfqS4PX6KEk6raNA9VzI08zkvAK0= +github.com/flyteorg/flyteidl v0.21.16/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.7.1 h1:YdCEQtdPeol7u6LkopGTIfPLAhy3KcclQa+DZFauK8w= github.com/flyteorg/flyteplugins v0.7.1/go.mod h1:kOiuXk1ddIEVSPoHcc4kBfVQcLuyf8jw3vWJT2Was90= github.com/flyteorg/flytepropeller v0.15.13 h1:SObqD0/oPzSt1fXRJO8g0zm9IxjzwPcdSOXmOc70v4E= diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index e8bfbf33dd..f3095386e2 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -1194,6 +1194,9 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition, "Cannot go from %s to %s for workflow execution %v", wfExecPhase.String(), request.Event.Phase.String(), request.Event.ExecutionId) + } else if wfExecPhase == core.WorkflowExecution_ABORTING && !common.IsExecutionTerminal(request.Event.Phase) { + return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition, + "Invalid phase change from aborting to %s for workflow execution %v", request.Event.Phase.String(), request.Event.ExecutionId) } err = transformers.UpdateExecutionModelState(ctx, executionModel, request, m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, m.storageClient) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 01e13705ec..23c37026f0 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -6,6 +6,8 @@ import ( "strings" "testing" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/flyteorg/flyteadmin/pkg/workflowengine" "github.com/benbjohnson/clock" @@ -1426,6 +1428,58 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) { assert.Equal(t, adminError.Code(), codes.FailedPrecondition) } +func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) { + repository := repositoryMocks.NewMockRepository() + executionGetFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { + return models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Spec: specBytes, + Phase: core.WorkflowExecution_ABORTING.String(), + }, nil + } + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) + updateExecutionFunc := func(context context.Context, execution models.Execution) error { + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) + + req := admin.WorkflowExecutionEventRequest{ + RequestId: "1", + Event: &event.WorkflowExecutionEvent{ + ExecutionId: &executionIdentifier, + Phase: core.WorkflowExecution_ABORTED, + OccurredAt: timestamppb.New(time.Now()), + }, + } + + mockDbEventWriter := &eventWriterMocks.WorkflowExecutionEventWriter{} + mockDbEventWriter.On("Write", req) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher, mockDbEventWriter) + + resp, err := execManager.CreateWorkflowEvent(context.Background(), req) + assert.NotNil(t, resp) + assert.NoError(t, err) + + req.Event.Phase = core.WorkflowExecution_QUEUED + resp, err = execManager.CreateWorkflowEvent(context.Background(), req) + assert.Nil(t, resp) + assert.NotNil(t, err) + adminError := err.(flyteAdminErrors.FlyteAdminError) + assert.Equal(t, adminError.Code(), codes.FailedPrecondition) + + req.Event.Phase = core.WorkflowExecution_RUNNING + resp, err = execManager.CreateWorkflowEvent(context.Background(), req) + assert.Nil(t, resp) + assert.NotNil(t, err) + adminError = err.(flyteAdminErrors.FlyteAdminError) + assert.Equal(t, adminError.Code(), codes.FailedPrecondition) +} + func TestCreateWorkflowEvent_StartedRunning(t *testing.T) { repository := repositoryMocks.NewMockRepository() occurredAt := time.Now().UTC() @@ -2195,9 +2249,7 @@ func TestTerminateExecution(t *testing.T) { assert.Equal(t, "name", execution.Name) assert.Equal(t, uint(1), execution.LaunchPlanID) assert.Equal(t, uint(2), execution.WorkflowID) - assert.Equal(t, core.WorkflowExecution_QUEUED.String(), execution.Phase, - "an abort call should not update the execution status until a corresponding execution event "+ - "is received") + assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase) assert.Equal(t, execution.ExecutionCreatedAt, execution.ExecutionUpdatedAt, "an abort call should not change ExecutionUpdatedAt until a corresponding execution event is received") assert.Equal(t, abortCause, execution.AbortCause) diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index d1b12126d0..cc3142d8ca 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -242,12 +242,14 @@ func SetExecutionAborted(execution *models.Execution, cause, principal string) e Principal: principal, }, } + closure.Phase = core.WorkflowExecution_ABORTING marshaledClosure, err := proto.Marshal(&closure) if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err) } execution.Closure = marshaledClosure execution.AbortCause = cause + execution.Phase = core.WorkflowExecution_ABORTING.String() return nil } diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index bbca4b7f85..4a934c0eb9 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -454,8 +454,10 @@ func TestSetExecutionAborted(t *testing.T) { }}, // The execution abort metadata is recorded but the phase is not actually updated *until* the abort event is // propagated by flytepropeller. - Phase: core.WorkflowExecution_RUNNING, + Phase: core.WorkflowExecution_ABORTING, }, &actualClosure)) + assert.Equal(t, existingModel.AbortCause, cause) + assert.Equal(t, existingModel.Phase, core.WorkflowExecution_ABORTING.String()) } func TestGetExecutionIdentifier(t *testing.T) {