From 405bd6f40bca375143f065006bd1a2baef5d953d Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Tue, 25 Jul 2023 20:01:56 -0500 Subject: [PATCH] fixing max parallelism (#594) * fixing max parallelism Signed-off-by: Daniel Rammer * Run subset of functional tests Signed-off-by: eduardo apolinario * Comment basics.deck.wf test out Signed-off-by: eduardo apolinario * Use flytetools@master in e2e tests and bring decks test back Signed-off-by: eduardo apolinario --------- Signed-off-by: Daniel Rammer Signed-off-by: eduardo apolinario Co-authored-by: eduardo apolinario --- flytepropeller/pkg/controller/nodes/task/handler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index d7300818b4..e0faf8f85d 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -668,6 +668,9 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution") } if pluginTrns.IsPreviouslyObserved() { + if !pluginTrns.pInfo.Phase().IsTerminal() { + logger.Infof(ctx, "Parallelism now set to [%d].", nCtx.ExecutionContext().IncrementParallelism()) + } logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.") return pluginTrns.FinalTransition(ctx) } @@ -755,8 +758,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) } if !pluginTrns.pInfo.Phase().IsTerminal() { - eCtx := nCtx.ExecutionContext() - logger.Infof(ctx, "Parallelism now set to [%d].", eCtx.IncrementParallelism()) + logger.Infof(ctx, "Parallelism now set to [%d].", nCtx.ExecutionContext().IncrementParallelism()) } return pluginTrns.FinalTransition(ctx) }