Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect subNode phase updates to reduce evaluation frequency of ArrayNode #4535

Merged
merged 5 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState()
currentArrayNodePhase := arrayNodeState.Phase

taskPhaseVersion := arrayNodeState.TaskPhaseVersion
incrementTaskPhaseVersion := false
eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder())

switch currentArrayNodePhase {
Expand Down Expand Up @@ -246,6 +246,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
messageCollector := errorcollector.NewErrorMessageCollector()
for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
nodePhase := v1alpha1.NodePhase(nodePhaseUint64)
taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i))

// do not process nodes in terminal state
if isTerminalNodePhase(nodePhase) {
Expand Down Expand Up @@ -283,6 +284,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts()))
arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures()))

// increment task phase version if subNode phase or task phase changed
if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase {
incrementTaskPhaseVersion = true
}
}

// process phases of subNodes to determine overall `ArrayNode` phase
Expand Down Expand Up @@ -429,17 +435,15 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
taskPhase = idlcore.TaskExecution_FAILED
}

// need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise
// reset to 0. by incrementing this always we report an event and ensure processing
// every time the ArrayNode is evaluated. if this overhead becomes too large, we will need
// to revisit and only increment when any subNode state changes.
// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise
// increment it if we detect any changes in subNode state.
if currentArrayNodePhase != arrayNodeState.Phase {
arrayNodeState.TaskPhaseVersion = 0
} else {
arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1
} else if incrementTaskPhaseVersion {
arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1
}

if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil {
if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil
}

if nodeInputs != nil {
if nodeInputs != nil && len(nodeInputs.Literals) > 0 {
inputsFile := v1alpha1.GetInputsFile(dataDir)
if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
Expand Down
3 changes: 3 additions & 0 deletions flytestdlib/storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type cachedRawStore struct {

// Head gets metadata about the reference. This should generally be a lightweight operation.
func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head")
defer span.End()

key := []byte(reference)
if oRaw, err := s.cache.Get(key); err == nil {
s.metrics.CacheHit.Inc()
Expand Down
Loading