Skip to content

Commit

Permalink
Fix bug to actually enable cluster reassignment on Queued events (fly…
Browse files Browse the repository at this point in the history
…teorg#366)

* checkpoint

Signed-off-by: Katrina Rogan <[email protected]>

* Add test

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Mar 9, 2022
1 parent 8eaa220 commit ddac175
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 10 deletions.
23 changes: 14 additions & 9 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,15 +1177,20 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi

wfExecPhase := core.WorkflowExecution_Phase(core.WorkflowExecution_Phase_value[executionModel.Phase])
// Subsequent queued events announcing a cluster reassignment are permitted.
if wfExecPhase == request.Event.Phase && request.Event.Phase != core.WorkflowExecution_QUEUED {
logger.Debugf(ctx, "This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists,
"This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
} else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil {
return nil, err
} else if common.IsExecutionTerminal(wfExecPhase) {
if request.Event.Phase != core.WorkflowExecution_QUEUED {
if wfExecPhase == request.Event.Phase {
logger.Debugf(ctx, "This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists,
"This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
} else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil {
// Only perform event cluster validation **after** an execution has moved on from QUEUED.
return nil, err
}
}

if common.IsExecutionTerminal(wfExecPhase) {
// Cannot go backwards in time from a terminal state to anything else
curPhase := wfExecPhase.String()
errorMsg := fmt.Sprintf("Invalid phase change from %s to %s for workflow execution %v", curPhase, request.Event.Phase.String(), request.Event.ExecutionId)
Expand Down
52 changes: 52 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,58 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) {
assert.True(t, ok)
}

func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
occurredAt := time.Now().UTC()

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Phase: core.WorkflowExecution_UNDEFINED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
WorkflowID: uint(2),
StartedAt: &occurredAt,
}, nil
},
)
newCluster := "C2"
updateExecutionFunc := func(
context context.Context, execution models.Execution) error {
assert.Equal(t, core.WorkflowExecution_QUEUED.String(), execution.Phase)
assert.Equal(t, newCluster, execution.Cluster)
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)

occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt)
mockDbEventWriter := &eventWriterMocks.WorkflowExecutionEventWriter{}
request := admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
ExecutionId: &executionIdentifier,
OccurredAt: occurredAtTimestamp,
Phase: core.WorkflowExecution_QUEUED,
ProducerId: newCluster,
},
}
mockDbEventWriter.On("Write", request)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher, mockDbEventWriter)

resp, err := execManager.CreateWorkflowEvent(context.Background(), request)
assert.Nil(t, err)
assert.NotNil(t, resp)
}

func TestCreateWorkflowEvent_InvalidEvent(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startTime := time.Now()
Expand Down
1 change: 0 additions & 1 deletion tests/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func TestUpdateClusterResourceAttributes(t *testing.T) {
fmt.Println(err)
assert.Nil(t, err)


listResp, err := client.ListMatchableAttributes(ctx, &admin.ListMatchableAttributesRequest{
ResourceType: admin.MatchableResource_CLUSTER_RESOURCE,
})
Expand Down

0 comments on commit ddac175

Please sign in to comment.