From 4d8f800f508554534f91b9dcc474d4c6b6a1f874 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 1 Nov 2022 15:10:45 +0800 Subject: [PATCH] Persist flyte deck during workflow recovery (#495) Signed-off-by: Kevin Su Signed-off-by: Kevin Su Co-authored-by: Dan Rammer --- pkg/controller/nodes/executor.go | 2 +- pkg/controller/nodes/executor_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 0182f51efa..58025c0c43 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -263,7 +263,7 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe OutputURI: outputFile, } - deckFile := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir()) + deckFile := storage.DataReference(recovered.Closure.GetDeckUri()) metadata, err := nCtx.DataStore().Head(ctx, deckFile) if err != nil { logger.Errorf(ctx, "Failed to check the existence of deck file. Error: %v", err) diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index 504ba3ad09..e1e6db59de 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -2099,6 +2099,7 @@ func TestRecover(t *testing.T) { OutputResult: &admin.NodeExecutionClosure_OutputUri{ OutputUri: "outputuri.pb", }, + DeckUri: deckPath, }, }, nil) @@ -2154,6 +2155,7 @@ func TestRecover(t *testing.T) { CacheStatus: core.CatalogCacheStatus_CACHE_HIT, }, }, + DeckUri: deckPath, }, }, nil) @@ -2237,6 +2239,7 @@ func TestRecover(t *testing.T) { }, }, }, + DeckUri: deckPath, }, }, nil) @@ -2319,6 +2322,7 @@ func TestRecover(t *testing.T) { OutputResult: &admin.NodeExecutionClosure_OutputUri{ OutputUri: "outputuri.pb", }, + DeckUri: deckPath, }, }, nil) @@ -2364,6 +2368,7 @@ func TestRecover(t *testing.T) { OutputResult: &admin.NodeExecutionClosure_OutputUri{ OutputUri: "outputuri.pb", }, + DeckUri: deckPath, }, }, nil)