From d3ef6b70a1dc1c032acf448e6f0a9bb832b85b1e Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 9 Mar 2022 06:28:26 -0800 Subject: [PATCH] Fix bug to actually enable cluster reassignment on Queued events (#366) * checkpoint Signed-off-by: Katrina Rogan * Add test Signed-off-by: Katrina Rogan --- .../pkg/manager/impl/execution_manager.go | 23 ++++---- .../manager/impl/execution_manager_test.go | 52 +++++++++++++++++++ flyteadmin/tests/attributes_test.go | 1 - 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 4d3cc820e8..17de26504a 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1177,15 +1177,20 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi wfExecPhase := core.WorkflowExecution_Phase(core.WorkflowExecution_Phase_value[executionModel.Phase]) // Subsequent queued events announcing a cluster reassignment are permitted. - if wfExecPhase == request.Event.Phase && request.Event.Phase != core.WorkflowExecution_QUEUED { - logger.Debugf(ctx, "This phase %s was already recorded for workflow execution %v", - wfExecPhase.String(), request.Event.ExecutionId) - return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, - "This phase %s was already recorded for workflow execution %v", - wfExecPhase.String(), request.Event.ExecutionId) - } else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil { - return nil, err - } else if common.IsExecutionTerminal(wfExecPhase) { + if request.Event.Phase != core.WorkflowExecution_QUEUED { + if wfExecPhase == request.Event.Phase { + logger.Debugf(ctx, "This phase %s was already recorded for workflow execution %v", + wfExecPhase.String(), request.Event.ExecutionId) + return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, + "This phase %s was already recorded for workflow execution %v", + wfExecPhase.String(), request.Event.ExecutionId) + } else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil { + // Only perform event cluster validation **after** an execution has moved on from QUEUED. + return nil, err + } + } + + if common.IsExecutionTerminal(wfExecPhase) { // Cannot go backwards in time from a terminal state to anything else curPhase := wfExecPhase.String() errorMsg := fmt.Sprintf("Invalid phase change from %s to %s for workflow execution %v", curPhase, request.Event.Phase.String(), request.Event.ExecutionId) diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index b231f3fb19..d88f48c26b 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -1626,6 +1626,58 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) { assert.True(t, ok) } +func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) { + repository := repositoryMocks.NewMockRepository() + occurredAt := time.Now().UTC() + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback( + func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { + return models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + BaseModel: models.BaseModel{ + ID: uint(8), + }, + Spec: specBytes, + Phase: core.WorkflowExecution_UNDEFINED.String(), + Closure: closureBytes, + LaunchPlanID: uint(1), + WorkflowID: uint(2), + StartedAt: &occurredAt, + }, nil + }, + ) + newCluster := "C2" + updateExecutionFunc := func( + context context.Context, execution models.Execution) error { + assert.Equal(t, core.WorkflowExecution_QUEUED.String(), execution.Phase) + assert.Equal(t, newCluster, execution.Cluster) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) + + occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt) + mockDbEventWriter := &eventWriterMocks.WorkflowExecutionEventWriter{} + request := admin.WorkflowExecutionEventRequest{ + RequestId: "1", + Event: &event.WorkflowExecutionEvent{ + ExecutionId: &executionIdentifier, + OccurredAt: occurredAtTimestamp, + Phase: core.WorkflowExecution_QUEUED, + ProducerId: newCluster, + }, + } + mockDbEventWriter.On("Write", request) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher, mockDbEventWriter) + + resp, err := execManager.CreateWorkflowEvent(context.Background(), request) + assert.Nil(t, err) + assert.NotNil(t, resp) +} + func TestCreateWorkflowEvent_InvalidEvent(t *testing.T) { repository := repositoryMocks.NewMockRepository() startTime := time.Now() diff --git a/flyteadmin/tests/attributes_test.go b/flyteadmin/tests/attributes_test.go index d977e0a377..f933fc84ca 100644 --- a/flyteadmin/tests/attributes_test.go +++ b/flyteadmin/tests/attributes_test.go @@ -61,7 +61,6 @@ func TestUpdateClusterResourceAttributes(t *testing.T) { fmt.Println(err) assert.Nil(t, err) - listResp, err := client.ListMatchableAttributes(ctx, &admin.ListMatchableAttributesRequest{ ResourceType: admin.MatchableResource_CLUSTER_RESOURCE, })