Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Handle incompatible cluster events errors (#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Jan 4, 2022
1 parent 51c85b7 commit c016dab
Show file tree
Hide file tree
Showing 20 changed files with 305 additions and 70 deletions.
8 changes: 8 additions & 0 deletions events/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
TooLarge ErrorCode = "TooLarge"
EventSinkError ErrorCode = "EventSinkError"
EventAlreadyInTerminalStateError ErrorCode = "EventAlreadyInTerminalStateError"
EventIncompatibleCusterError ErrorCode = "EventIncompatibleClusterError"
)

type EventError struct {
Expand Down Expand Up @@ -67,6 +68,8 @@ func WrapError(err error) error {
case *admin.EventFailureReason_AlreadyInTerminalState:
phase := reason.AlreadyInTerminalState.GetCurrentPhase()
return wrapf(EventAlreadyInTerminalStateError, err, fmt.Sprintf("conflicting events; destination: %v", phase))
case *admin.EventFailureReason_IncompatibleCluster:
return wrapf(EventIncompatibleCusterError, err, fmt.Sprintf("conflicting execution cluster; expected: %v", reason.IncompatibleCluster.Cluster))
default:
logger.Warnf(context.Background(), "found unexpected type in details of grpc status: %v", reason)
}
Expand Down Expand Up @@ -130,3 +133,8 @@ func IsTooLarge(err error) bool {
func IsEventAlreadyInTerminalStateError(err error) bool {
return errors.Is(err, &EventError{Code: EventAlreadyInTerminalStateError})
}

// IsEventIncompatibleClusterError checks if the error is of type EventError and the ErrorCode is of type EventIncompatibleCusterError
func IsEventIncompatibleClusterError(err error) bool {
return errors.Is(err, &EventError{Code: EventIncompatibleCusterError})
}
9 changes: 9 additions & 0 deletions events/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func isUnknownError(err error) bool {

// Test that we wrap the gRPC error to our correct one
func TestWrapErrors(t *testing.T) {
incompatibleClusterErr, _ := status.New(codes.FailedPrecondition, "incompat").WithDetails(&admin.EventFailureReason{
Reason: &admin.EventFailureReason_IncompatibleCluster{
IncompatibleCluster: &admin.EventErrorIncompatibleCluster{
Cluster: "c1",
},
},
})

tests := []struct {
name string
inputError error
Expand All @@ -55,6 +63,7 @@ func TestWrapErrors(t *testing.T) {
{"uncaughtError", status.Error(codes.Unknown, "Unknown Err"), isEventError},
{"uncaughtError", fmt.Errorf("Random err"), isUnknownError},
{"errorWithReason", createTestErrorWithReason(), IsEventAlreadyInTerminalStateError},
{"incompatibleCluster", incompatibleClusterErr.Err(), IsEventIncompatibleClusterError},
}

for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.11
github.com/flyteorg/flyteidl v0.21.15
github.com/flyteorg/flyteplugins v0.9.1
github.com/flyteorg/flytestdlib v0.4.7
github.com/ghodss/yaml v1.0.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.15 h1:XplSOL7Vl2dUriveXS27bnLhuNyAL+DR3sFexhFXrWE=
github.com/flyteorg/flyteidl v0.21.15/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.9.1 h1:Z0gxSvG7LeI+COfEmuzkhz9RnJ4E5wWUcjj5qh1uKuw=
github.com/flyteorg/flyteplugins v0.9.1/go.mod h1:OEGQztPFDJG4DV7tS9lDsRRd521iUINn5dcsBf6bW5k=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var (
EventConfig: EventConfig{
RawOutputPolicy: RawOutputPolicyReference,
},
ClusterID: "propeller",
}
)

Expand Down Expand Up @@ -143,6 +144,7 @@ type Config struct {
ExcludeProjectLabel []string `json:"exclude-project-label" pflag:",Exclude the specified project label from the k8s FlyteWorkflow CRD label selector"`
IncludeDomainLabel []string `json:"include-domain-label" pflag:",Include the specified domain label in the k8s FlyteWorkflow CRD label selector"`
ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,12 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), &cfg.EventConfig, scope)
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), &cfg.EventConfig, cfg.ClusterID, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, scope)
workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope)
if err != nil {
return nil, err
}
Expand Down
24 changes: 22 additions & 2 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F
}()
err = p.workflowExecutor.HandleFlyteWorkflow(ctx, mutableW)
}()

if err != nil {
logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]. Is nill [%v]",
logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]",
err, reflect.TypeOf(err))
p.metrics.SystemError.Inc(ctx)
return nil, err
Expand Down Expand Up @@ -250,6 +249,27 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
return nil
}

// Incompatible cluster means that another cluster has been designated to handle this workflow execution.
// We should early abort in this case, since any events originating from this cluster for this execution will
// be rejected.
if err != nil && eventsErr.IsEventIncompatibleClusterError(err) {
t.Stop()
logger.Errorf(ctx, "No longer designated to process workflow, failing: %s", err)

// We set the workflow status to failing to abort any active tasks in the next round.
mutableW := w.DeepCopy()
mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution cluster reassigned, aborting", &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: string(eventsErr.EventIncompatibleCusterError),
Message: fmt.Sprintf("Workflow execution cluster reassigned: %v", err),
})
if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil {
logger.Errorf(ctx, "Failed to record an EventIncompatibleClusterError workflow as failed, reason: %s. Retrying...", e)
return e
}
return nil
}

// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
Expand Down
30 changes: 30 additions & 0 deletions pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,36 @@ func TestPropeller_Handle(t *testing.T) {
}
assert.NoError(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseFailing, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 0, len(r.Finalizers))
assert.False(t, HasCompletedLabel(r))
})
t.Run("failOnIncompatibleClusterError", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
Status: v1alpha1.WorkflowStatus{
Phase: v1alpha1.WorkflowPhaseRunning,
FailedAttempts: 0,
},
}))
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
return workflowErrors.Wrapf(workflowErrors.EventRecordingError, "",
&eventErrors.EventError{
Code: eventErrors.EventIncompatibleCusterError,
Cause: nil,
Message: "The execution is recorded as running on a different cluster",
}, "failed to transition phase")
}
assert.NoError(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseFailing, r.GetExecutionStatus().GetPhase())
Expand Down
13 changes: 8 additions & 5 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type nodeExecutor struct {
shardSelector ioutils.ShardSelector
recoveryClient recovery.Client
eventConfig *config.EventConfig
clusterID string
}

func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) {
Expand Down Expand Up @@ -455,7 +456,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor
logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String())
nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nodeStatus, nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.node)
nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID)
if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event")
}
Expand Down Expand Up @@ -569,7 +570,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node
logger.Infof(ctx, "Change in node state detected from [%s] -> [%s], (handler phase [%s])", nodeStatus.GetPhase().String(), np.String(), p.GetPhase().String())
nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.node)
nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID)
if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event")
}
Expand Down Expand Up @@ -1053,8 +1054,9 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E
Message: reason,
},
},
ProducerId: c.clusterID,
})
if err != nil {
if err != nil && !eventsErr.IsEventIncompatibleClusterError(err) {
if errors2.IsCausedBy(err, errors.IllegalStateError) {
logger.Debugf(ctx, "Failed to record abort event due to illegal state transition. Ignoring the error. Error: %v", err)
} else {
Expand Down Expand Up @@ -1105,7 +1107,7 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error {
func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, maxDatasetSize int64,
defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client,
catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, scope promutils.Scope) (executors.Node, error) {
catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, scope promutils.Scope) (executors.Node, error) {

// TODO we may want to make this configurable.
shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx)
Expand Down Expand Up @@ -1151,8 +1153,9 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
shardSelector: shardSelector,
recoveryClient: recoveryClient,
eventConfig: eventConfig,
clusterID: clusterID,
}
nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, launchPlanReader, kubeClient, catalogClient, recoveryClient, eventConfig, nodeScope)
nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, launchPlanReader, kubeClient, catalogClient, recoveryClient, eventConfig, clusterID, nodeScope)
exec.nodeHandlerFactory = nodeHandlerFactory
return exec, err
}
Expand Down
Loading

0 comments on commit c016dab

Please sign in to comment.