Skip to content

Commit

Permalink
Introduce aborting phase (flyteorg#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Jan 5, 2022
1 parent 81ac07f commit 6ec0694
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.21.15
github.com/flyteorg/flyteidl v0.21.16
github.com/flyteorg/flyteplugins v0.7.1
github.com/flyteorg/flytepropeller v0.15.13
github.com/flyteorg/flytestdlib v0.4.7
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.21.4/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.13 h1:JM7eEokDO+ilXKqdW3CZa/mr6lxoVJ2ztfWZQEyJxEg=
github.com/flyteorg/flyteidl v0.21.13/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.14 h1:uOc3u0DFkFPag5fOQ8LMmMBBZqHy19ikxShnRlRAzZ0=
github.com/flyteorg/flyteidl v0.21.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.15 h1:XplSOL7Vl2dUriveXS27bnLhuNyAL+DR3sFexhFXrWE=
github.com/flyteorg/flyteidl v0.21.15/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.16 h1:MepQ5iNnYbYDlR5kfqS4PX6KEk6raNA9VzI08zkvAK0=
github.com/flyteorg/flyteidl v0.21.16/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.7.1 h1:YdCEQtdPeol7u6LkopGTIfPLAhy3KcclQa+DZFauK8w=
github.com/flyteorg/flyteplugins v0.7.1/go.mod h1:kOiuXk1ddIEVSPoHcc4kBfVQcLuyf8jw3vWJT2Was90=
github.com/flyteorg/flytepropeller v0.15.13 h1:SObqD0/oPzSt1fXRJO8g0zm9IxjzwPcdSOXmOc70v4E=
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,9 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition,
"Cannot go from %s to %s for workflow execution %v",
wfExecPhase.String(), request.Event.Phase.String(), request.Event.ExecutionId)
} else if wfExecPhase == core.WorkflowExecution_ABORTING && !common.IsExecutionTerminal(request.Event.Phase) {
return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition,
"Invalid phase change from aborting to %s for workflow execution %v", request.Event.Phase.String(), request.Event.ExecutionId)
}

err = transformers.UpdateExecutionModelState(ctx, executionModel, request, m.config.ApplicationConfiguration().GetRemoteDataConfig().InlineEventDataPolicy, m.storageClient)
Expand Down
58 changes: 55 additions & 3 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"testing"

"google.golang.org/protobuf/types/known/timestamppb"

"github.com/flyteorg/flyteadmin/pkg/workflowengine"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -1426,6 +1428,58 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) {
assert.Equal(t, adminError.Code(), codes.FailedPrecondition)
}

func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
executionGetFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Phase: core.WorkflowExecution_ABORTING.String(),
}, nil
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
updateExecutionFunc := func(context context.Context, execution models.Execution) error {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)

req := admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
ExecutionId: &executionIdentifier,
Phase: core.WorkflowExecution_ABORTED,
OccurredAt: timestamppb.New(time.Now()),
},
}

mockDbEventWriter := &eventWriterMocks.WorkflowExecutionEventWriter{}
mockDbEventWriter.On("Write", req)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher, mockDbEventWriter)

resp, err := execManager.CreateWorkflowEvent(context.Background(), req)
assert.NotNil(t, resp)
assert.NoError(t, err)

req.Event.Phase = core.WorkflowExecution_QUEUED
resp, err = execManager.CreateWorkflowEvent(context.Background(), req)
assert.Nil(t, resp)
assert.NotNil(t, err)
adminError := err.(flyteAdminErrors.FlyteAdminError)
assert.Equal(t, adminError.Code(), codes.FailedPrecondition)

req.Event.Phase = core.WorkflowExecution_RUNNING
resp, err = execManager.CreateWorkflowEvent(context.Background(), req)
assert.Nil(t, resp)
assert.NotNil(t, err)
adminError = err.(flyteAdminErrors.FlyteAdminError)
assert.Equal(t, adminError.Code(), codes.FailedPrecondition)
}

func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
occurredAt := time.Now().UTC()
Expand Down Expand Up @@ -2195,9 +2249,7 @@ func TestTerminateExecution(t *testing.T) {
assert.Equal(t, "name", execution.Name)
assert.Equal(t, uint(1), execution.LaunchPlanID)
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_QUEUED.String(), execution.Phase,
"an abort call should not update the execution status until a corresponding execution event "+
"is received")
assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase)
assert.Equal(t, execution.ExecutionCreatedAt, execution.ExecutionUpdatedAt,
"an abort call should not change ExecutionUpdatedAt until a corresponding execution event is received")
assert.Equal(t, abortCause, execution.AbortCause)
Expand Down
2 changes: 2 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,14 @@ func SetExecutionAborted(execution *models.Execution, cause, principal string) e
Principal: principal,
},
}
closure.Phase = core.WorkflowExecution_ABORTING
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
execution.AbortCause = cause
execution.Phase = core.WorkflowExecution_ABORTING.String()
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,10 @@ func TestSetExecutionAborted(t *testing.T) {
}},
// The execution abort metadata is recorded but the phase is not actually updated *until* the abort event is
// propagated by flytepropeller.
Phase: core.WorkflowExecution_RUNNING,
Phase: core.WorkflowExecution_ABORTING,
}, &actualClosure))
assert.Equal(t, existingModel.AbortCause, cause)
assert.Equal(t, existingModel.Phase, core.WorkflowExecution_ABORTING.String())
}

func TestGetExecutionIdentifier(t *testing.T) {
Expand Down

0 comments on commit 6ec0694

Please sign in to comment.