Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Implement recovery endpoint (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Jul 27, 2021
1 parent 8152f28 commit 0a4ac6a
Show file tree
Hide file tree
Showing 12 changed files with 424 additions and 17 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.19.5
github.com/flyteorg/flyteplugins v0.5.56
github.com/flyteorg/flytepropeller v0.12.9
github.com/flyteorg/flytestdlib v0.3.22
github.com/flyteorg/flyteidl v0.19.14
github.com/flyteorg/flyteplugins v0.5.59
github.com/flyteorg/flytepropeller v0.13.3
github.com/flyteorg/flytestdlib v0.3.27
github.com/ghodss/yaml v1.0.0
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,16 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.5 h1:qNhNK6mhCTuOms7zJmBtog6bLQJhBj+iScf1IlHdqeg=
github.com/flyteorg/flyteidl v0.19.5/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.56 h1:LF/dwMFJDSMEmOp8hd9rU4Et4oyn0K+LgMzcHOu/xrw=
github.com/flyteorg/flyteplugins v0.5.56/go.mod h1:Jp5WheQMI08luZmgcmcgyjtzakKH0tPws/t35DzpKUA=
github.com/flyteorg/flytepropeller v0.12.9 h1:ocxVxJlB8t7nP1fesJ20+4VCDM7oLF1ahqXC+E3sw2c=
github.com/flyteorg/flytepropeller v0.12.9/go.mod h1:DxQI+r+Yg6EAajDBmfKJqOjDBwiM4cJgfPSyWjiz2l0=
github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI=
github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.59 h1:Uw1xlrlx5rSTpdTMwJTo7mbqHI7X7p7CFVm3473iRjo=
github.com/flyteorg/flyteplugins v0.5.59/go.mod h1:nesnW7pJhXEysFQg9TnSp36ao33ie0oA/TI4sYPaeyw=
github.com/flyteorg/flytepropeller v0.13.3 h1:nnO4d9w6UbgLCF9kn0M6LTkYpS/F5jEoEF22YcRmLYI=
github.com/flyteorg/flytepropeller v0.13.3/go.mod h1:c+OOw8L7h1/IaxoiRZ1Hmhenlc1dxIT23yzhFETRgXI=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.17/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU=
github.com/flyteorg/flytestdlib v0.3.22 h1:nJEPaCdxzXBaeg2p4fdo3I3Ua09NedFRaUwuLafLEdw=
github.com/flyteorg/flytestdlib v0.3.22/go.mod h1:1XG0DwYTUm34Yrffm1Qy9Tdr/pWQypEqTq5dUxw3/cM=
github.com/flyteorg/flytestdlib v0.3.27 h1:d3OI5qb5u8CkSs2HMTuM62K5GuTrf6FJKq8CHW6Ymbs=
github.com/flyteorg/flytestdlib v0.3.27/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
55 changes: 55 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if overrides != nil {
executeWorkflowInputs.TaskPluginOverrides = overrides
}
if request.Spec.Metadata != nil && request.Spec.Metadata.ReferenceExecution != nil &&
request.Spec.Metadata.Mode == admin.ExecutionMetadata_RECOVERED {
executeWorkflowInputs.RecoveryExecution = request.Spec.Metadata.ReferenceExecution
}

execInfo, err := m.workflowExecutor.ExecuteWorkflow(ctx, executeWorkflowInputs)
if err != nil {
Expand Down Expand Up @@ -901,6 +905,57 @@ func (m *ExecutionManager) RelaunchExecution(
}, nil
}

func (m *ExecutionManager) RecoverExecution(
ctx context.Context, request admin.ExecutionRecoverRequest, requestedAt time.Time) (
*admin.ExecutionCreateResponse, error) {
existingExecutionModel, err := util.GetExecutionModel(ctx, m.db, *request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel)
if err != nil {
return nil, err
}

executionSpec := existingExecution.Spec
if executionSpec.Metadata == nil {
executionSpec.Metadata = &admin.ExecutionMetadata{}
}
var inputs *core.LiteralMap
if len(existingExecutionModel.UserInputsURI) > 0 {
inputs = &core.LiteralMap{}
if err := m.storageClient.ReadProtobuf(ctx, existingExecutionModel.UserInputsURI, inputs); err != nil {
return nil, err
}
}
if request.Metadata != nil {
executionSpec.Metadata.ParentNodeExecution = request.Metadata.ParentNodeExecution
}
executionSpec.Metadata.Mode = admin.ExecutionMetadata_RECOVERED
executionSpec.Metadata.ReferenceExecution = existingExecution.Id
var executionModel *models.Execution
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Name,
Spec: executionSpec,
Inputs: inputs,
}, requestedAt)
if err != nil {
return nil, err
}
executionModel.SourceExecutionID = existingExecutionModel.ID
workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel)
if err != nil {
return nil, err
}
logger.Infof(ctx, "Successfully recovered [%+v] as [%+v]", request.Id, workflowExecutionIdentifier)
return &admin.ExecutionCreateResponse{
Id: workflowExecutionIdentifier,
}, nil
}

func (m *ExecutionManager) emitScheduledWorkflowMetrics(
ctx context.Context, executionModel *models.Execution, runningEventTimeProto *timestamp.Timestamp) {
if executionModel == nil || runningEventTimeProto == nil {
Expand Down
223 changes: 223 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,229 @@ func TestRelaunchExecution_CreateFailure(t *testing.T) {
assert.EqualError(t, err, expectedErr.Error())
}

func TestRecoverExecution(t *testing.T) {
// Set up mocks.
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
startTime := time.Now()
startTimeProto, _ := ptypes.TimestampProto(startTime)
existingClosure := admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
StartedAt: startTimeProto,
}
existingClosureBytes, _ := proto.Marshal(&existingClosure)
executionGetFunc := makeExecutionGetFunc(t, existingClosureBytes, &startTime)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)

var createCalled bool
exCreateFunc := func(ctx context.Context, input models.Execution) error {
createCalled = true
assert.Equal(t, "recovered", input.Name)
assert.Equal(t, "domain", input.Domain)
assert.Equal(t, "project", input.Project)
assert.Equal(t, uint(8), input.SourceExecutionID)
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.Nil(t, err)
assert.Equal(t, admin.ExecutionMetadata_RECOVERED, spec.Metadata.Mode)
assert.Equal(t, int32(admin.ExecutionMetadata_RECOVERED), input.Mode)
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)

// Issue request.
response, err := execManager.RecoverExecution(context.Background(), admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Name: "recovered",
}, requestedAt)

// And verify response.
assert.Nil(t, err)

expectedResponse := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "recovered",
},
}
assert.True(t, createCalled)
assert.True(t, proto.Equal(expectedResponse, response))
}

func TestRecoverExecution_RecoveredChildNode(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
startTime := time.Now()
startTimeProto, _ := ptypes.TimestampProto(startTime)
existingClosure := admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
StartedAt: startTimeProto,
}
existingClosureBytes, _ := proto.Marshal(&existingClosure)
referencedExecutionID := uint(123)
ignoredExecutionID := uint(456)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
switch input.Name {
case "name":
return models.Execution{
Spec: specBytes,
Closure: existingClosureBytes,
BaseModel: models.BaseModel{
ID: referencedExecutionID,
},
}, nil
case "orig":
return models.Execution{
BaseModel: models.BaseModel{
ID: ignoredExecutionID,
},
}, nil
default:
return models.Execution{}, flyteAdminErrors.NewFlyteAdminErrorf(codes.InvalidArgument, "unexpected get for execution %s", input.Name)
}
})

parentNodeDatabaseID := uint(12345)
var createCalled bool
exCreateFunc := func(ctx context.Context, input models.Execution) error {
createCalled = true
assert.Equal(t, "recovered", input.Name)
assert.Equal(t, "domain", input.Domain)
assert.Equal(t, "project", input.Project)
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.Nil(t, err)
assert.Equal(t, admin.ExecutionMetadata_RECOVERED, spec.Metadata.Mode)
assert.Equal(t, int32(admin.ExecutionMetadata_RECOVERED), input.Mode)
assert.Equal(t, parentNodeDatabaseID, input.ParentNodeExecutionID)
assert.Equal(t, referencedExecutionID, input.SourceExecutionID)

return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)

parentNodeExecution := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "p",
Domain: "d",
Name: "orig",
},
NodeId: "parent",
}
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.True(t, proto.Equal(&parentNodeExecution, &input.NodeExecutionIdentifier))

return models.NodeExecution{
BaseModel: models.BaseModel{
ID: parentNodeDatabaseID,
},
}, nil
})

// Issue request.
response, err := execManager.RecoverExecution(context.Background(), admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Name: "recovered",
Metadata: &admin.ExecutionMetadata{
ParentNodeExecution: &parentNodeExecution,
},
}, requestedAt)

// And verify response.
assert.Nil(t, err)

expectedResponse := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "recovered",
},
}
assert.True(t, createCalled)
assert.True(t, proto.Equal(expectedResponse, response))
}

func TestRecoverExecution_GetExistingFailure(t *testing.T) {
// Set up mocks.
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

expectedErr := errors.New("expected error")
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{}, expectedErr
})

var createCalled bool
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
createCalled = true
return nil
})

// Issue request.
_, err := execManager.RecoverExecution(context.Background(), admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Name: "recovered",
}, requestedAt)

// And verify response.
assert.EqualError(t, err, expectedErr.Error())
assert.False(t, createCalled)
}

func TestRecoverExecution_GetExistingInputsFailure(t *testing.T) {
// Set up mocks.
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)

expectedErr := errors.New("foo")
mockStorage := commonMocks.GetMockStorageClient()
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(
ctx context.Context, reference storage.DataReference, msg proto.Message) error {
return expectedErr
}
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
startTime := time.Now()
startTimeProto, _ := ptypes.TimestampProto(startTime)
existingClosure := admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
StartedAt: startTimeProto,
}
existingClosureBytes, _ := proto.Marshal(&existingClosure)
executionGetFunc := makeExecutionGetFunc(t, existingClosureBytes, &startTime)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)

// Issue request.
_, err := execManager.RecoverExecution(context.Background(), admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Name: "recovered",
}, requestedAt)

// And verify response.
assert.EqualError(t, err, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : foo")
}

func TestCreateWorkflowEvent(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startTime := time.Now()
Expand Down
5 changes: 5 additions & 0 deletions pkg/manager/interfaces/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ type ExecutionInterface interface {
*admin.ExecutionCreateResponse, error)
RelaunchExecution(ctx context.Context, request admin.ExecutionRelaunchRequest, requestedAt time.Time) (
*admin.ExecutionCreateResponse, error)
// Recreates a previously-run workflow execution that will point to the original execution so that propeller will
// only start executing from the last known failure point. Propeller can recover individual workflow execution nodes
// which previously succeeded based on the recovery (original) workflow execution id.
RecoverExecution(ctx context.Context, request admin.ExecutionRecoverRequest, requestedAt time.Time) (
*admin.ExecutionCreateResponse, error)
CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) (
*admin.WorkflowExecutionEventResponse, error)
GetExecution(ctx context.Context, request admin.WorkflowExecutionGetRequest) (*admin.Execution, error)
Expand Down
11 changes: 11 additions & 0 deletions pkg/manager/mocks/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type CreateExecutionFunc func(
type RelaunchExecutionFunc func(
ctx context.Context, request admin.ExecutionRelaunchRequest, requestedAt time.Time) (
*admin.ExecutionCreateResponse, error)
type RecoverExecutionFunc func(ctx context.Context, request admin.ExecutionRecoverRequest, requestedAt time.Time) (
*admin.ExecutionCreateResponse, error)
type CreateExecutionEventFunc func(ctx context.Context, request admin.WorkflowExecutionEventRequest) (
*admin.WorkflowExecutionEventResponse, error)
type GetExecutionFunc func(ctx context.Context, request admin.WorkflowExecutionGetRequest) (*admin.Execution, error)
Expand All @@ -25,6 +27,7 @@ type TerminateExecutionFunc func(
type MockExecutionManager struct {
createExecutionFunc CreateExecutionFunc
relaunchExecutionFunc RelaunchExecutionFunc
RecoverExecutionFunc RecoverExecutionFunc
createExecutionEventFunc CreateExecutionEventFunc
getExecutionFunc GetExecutionFunc
getExecutionDataFunc GetExecutionDataFunc
Expand Down Expand Up @@ -62,6 +65,14 @@ func (m *MockExecutionManager) SetCreateEventCallback(createEventFunc CreateExec
m.createExecutionEventFunc = createEventFunc
}

func (m *MockExecutionManager) RecoverExecution(ctx context.Context, request admin.ExecutionRecoverRequest, requestedAt time.Time) (
*admin.ExecutionCreateResponse, error) {
if m.RecoverExecutionFunc != nil {
return m.RecoverExecutionFunc(ctx, request, requestedAt)
}
return &admin.ExecutionCreateResponse{}, nil
}

func (m *MockExecutionManager) CreateWorkflowEvent(
ctx context.Context,
request admin.WorkflowExecutionEventRequest) (*admin.WorkflowExecutionEventResponse, error) {
Expand Down
Loading

0 comments on commit 0a4ac6a

Please sign in to comment.