From e28ade0af1b62c5ef2d9f754170f915b387d4d1e Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Fri, 17 Jan 2020 16:35:48 -0800 Subject: [PATCH] Fixing the output dir in workflow event sent to Admin (#58) * test * . * new rm * increment attempt after finalize, just before marking it running again * test * updating dependencies --- flytepropeller/Gopkg.lock | 64 +++++++++---------- flytepropeller/Gopkg.toml | 2 +- .../pkg/controller/nodes/branch/handler.go | 2 +- .../pkg/controller/nodes/executor.go | 3 +- .../pkg/controller/nodes/executor_test.go | 2 +- .../nodes/subworkflow/subworkflow.go | 4 +- .../pkg/controller/workflow/executor.go | 4 +- 7 files changed, 40 insertions(+), 41 deletions(-) diff --git a/flytepropeller/Gopkg.lock b/flytepropeller/Gopkg.lock index 77b78163f..d057ac62a 100644 --- a/flytepropeller/Gopkg.lock +++ b/flytepropeller/Gopkg.lock @@ -18,7 +18,7 @@ version = "v10.2.1-beta" [[projects]] - digest = "1:b82e05494fb7e33ff9e4421171be18159a8afb614e3794572ddfe2db28a05d5f" + digest = "1:efeadf058f690cda94453caa88c469012573ac0859d0c12b0c2fb62ad57b9a89" name = "github.com/Azure/go-autorest" packages = [ "autorest", @@ -29,8 +29,8 @@ "tracing", ] pruneopts = "" - revision = "21d4b01533b1005be0d020da67a6d3f8ebdf0141" - version = "v13.3.1" + revision = "20a15b4e99645bc30a1cf8de518a6e8ac331c864" + version = "v13.3.2" [[projects]] digest = "1:558b53577dc0c9fde49b08405d706b202bcac3064320e9be53a75fc866280ee3" @@ -53,7 +53,7 @@ version = "v0.1.3" [[projects]] - digest = "1:7b3f2414994a962e15a976f4b904868a7d9eafc344fcf3ea6f721fc0f7f9b30b" + digest = "1:b0eb88b5988cf45c0c3b26adf32e09ba8eb7f762682a066b485c1ffce2ecbaa5" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -103,8 +103,8 @@ "service/sts/stsiface", ] pruneopts = "" - revision = "d38197695f0b8b79b5a5e09405918b2d27710c87" - version = "v1.28.1" + revision = "5abff723756e51686a6a4377e7c5b1d74a9e18a3" + version = "v1.28.5" [[projects]] branch = "master" @@ -219,12 +219,12 @@ version = "v6.15.6" [[projects]] - digest = "1:7b1d9ea7c8b57902ab828504c0745032881a79e83fe64ddb4b162c6284a00965" + digest = "1:4c7b06357903bfac94cd85e3590653ece5cde123b8367f7c8a442e8ccd5ebc19" name = "github.com/go-test/deep" packages = ["."] pruneopts = "" - revision = "6071681f918c8db002a213b1c6bd2681d92ba996" - version = "v1.0.4" + revision = "3c58d730f064836fd875a220cfefdef7b2b8075a" + version = "v1.0.5" [[projects]] digest = "1:d69d2ba23955582a64e367ff2b0808cdbd048458c178cea48f11ab8c40bd7aea" @@ -364,15 +364,15 @@ version = "v1.12.1" [[projects]] - digest = "1:7f6f07500a0b7d3766b00fa466040b97f2f5b5f3eef2ecabfe516e703b05119a" + digest = "1:e9ba8bd7f740264f703a41911c9523fb06aaf439dd899cedb263aadddc4be90e" name = "github.com/hashicorp/golang-lru" packages = [ ".", "simplelru", ] pruneopts = "" - revision = "7f827b33c0f158ec5dfbba01bb0b14a4541fd81d" - version = "v0.5.3" + revision = "14eae340515388ca95aa8e7b86f0de668e981f54" + version = "v0.5.4" [[projects]] digest = "1:d14365c51dd1d34d5c79833ec91413bfbb166be978724f15701e17080dc06dec" @@ -461,7 +461,7 @@ version = "v0.16.6" [[projects]] - digest = "1:d09a0aaa0c3530401bc44244aa8ef992033bfaedbe72e181a69b3e634b2d9b72" + digest = "1:50f0ddc91c5c16fd299cca51e1b451a35a82536e9fd6f2acb29104990bfccfd7" name = "github.com/lyft/flyteplugins" packages = [ "go/tasks/aws", @@ -499,8 +499,8 @@ "tests", ] pruneopts = "" - revision = "c8d215cfc629b3812216f7478b04910f13dfd0aa" - version = "v0.2.7" + revision = "f021ac4dc0d422acd2d25a53dc91e92935680ba1" + version = "v0.2.8" [[projects]] digest = "1:6c8d5d4b7189d903f7a2bcca20d5e2d721493f51406a16dcf27caf09315f522c" @@ -735,12 +735,12 @@ version = "v1.0.5" [[projects]] - digest = "1:ede5f300103cb012aafde77c692dc853c4b590bb412d3c7965a11748a5c37635" + digest = "1:0d8f28607c22fc1428d9765589b86763a115391e3b8892f7dbd9798bc69a2273" name = "github.com/spf13/viper" packages = ["."] pruneopts = "" - revision = "eabbc68a3ecd5cf8c11a2f84dbda5e7a38493b2f" - version = "v1.6.1" + revision = "4525543ce4fe90f7970f5e2cdc300b8ffc8c0582" + version = "v1.6.2" [[projects]] digest = "1:711eebe744c0151a9d09af2315f0bb729b2ec7637ef4c410fa90a18ef74b65b6" @@ -796,7 +796,7 @@ [[projects]] branch = "master" - digest = "1:47ec6ad813222524cbcd6c5c23b9a6a1a3f45b502fe5088fd59e6ba6a1436db3" + digest = "1:8dc5306c5097afa86c85335c9e981a22c164aab641ff749f88d2eecf9dbfdb93" name = "golang.org/x/crypto" packages = [ "ed25519", @@ -805,11 +805,11 @@ "ssh/terminal", ] pruneopts = "" - revision = "61a87790db17894570dfb32dbaa0a4af9ce60cb4" + revision = "530e935923ad688be97c15eeb8e5ee42ebf2b54a" [[projects]] branch = "master" - digest = "1:cab37ea831bb9be343b41a7673e64b6bb8399bc0f051180c04ce561573ed2c89" + digest = "1:bce1fb1dafa615413d845819aa75ba69d0979cdc2ac3b840e1c19c802a737916" name = "golang.org/x/net" packages = [ "context", @@ -822,7 +822,7 @@ "trace", ] pruneopts = "" - revision = "c0dbc17a35534bf2e581d7a942408dc936316da4" + revision = "6afb5195e5aab057fda82e27171243402346b0ad" [[projects]] branch = "master" @@ -841,14 +841,14 @@ [[projects]] branch = "master" - digest = "1:aacc6b6fe34f21feb81b6c0c993b36d40da25ba0a63c034d0f726abcfd4edd25" + digest = "1:e5853512d9a8ada4a406455a756247f4b40eda2029061b2c130ebda211e6ee8b" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "86b910548bc16777f40503131aa424ae0a092199" + revision = "59e60aa80a0c64fa4b088976ee16ad7f04252c25" [[projects]] digest = "1:740b51a55815493a8d0f2b1e0d0ae48fe48953bf7eaf3fcc4198823bf67768c0" @@ -885,7 +885,7 @@ [[projects]] branch = "master" - digest = "1:fc464b20530014989c36e032b4bc966da29bd17b5995ae6a81f40c570e4c5319" + digest = "1:9336e850385dc8a345c1bd4f0d329a4e951bcf42ad3538ae6269ed20dc630ec7" name = "golang.org/x/tools" packages = [ "go/ast/astutil", @@ -897,7 +897,7 @@ "internal/semver", ] pruneopts = "" - revision = "86d412b4c6eacdfb118ca50fe39b3b006b76de02" + revision = "0cba7a3a9ee9d9a5917ed56b9668e7ad35eaa571" [[projects]] branch = "master" @@ -920,7 +920,7 @@ [[projects]] branch = "master" - digest = "1:de05d36353393ca26763ab9fbb531d9623db90f38053928d06806f7faccf5456" + digest = "1:5ec0fb9e8103f47dae8b62032e8397a14b1eac01af10b7db5d7c7cee392fad9f" name = "google.golang.org/api" packages = [ "googleapi", @@ -934,7 +934,7 @@ "transport/http/internal/propagation", ] pruneopts = "" - revision = "64f53cc407027fc5c680faf0e33aae2a6bbdf18f" + revision = "b4cd77d6a56cb166fb09be9d2264a9f42264fffd" [[projects]] digest = "1:c4404231035fad619a12f82ae3f0f8f9edc1cc7f34e7edad7a28ccac5336cc96" @@ -957,7 +957,7 @@ [[projects]] branch = "master" - digest = "1:8c4a215ffeaa77092a6252ff1470df26037e40fa746fb9249eee990e2935a7a5" + digest = "1:d15b46ad7d21bb64bb6f9f057e002dd5c32c6dd10bdd704ebb0b6f61e72550b9" name = "google.golang.org/genproto" packages = [ "googleapis/api/annotations", @@ -966,7 +966,7 @@ "protobuf/field_mask", ] pruneopts = "" - revision = "e1de0a7b01eb2fc11d735e4bfb79d2e53ec9edb3" + revision = "32f20d992d240fbca6ef7dec6c05d1f024314e02" [[projects]] digest = "1:7af390490e636a6adc9c76b37a3c823195fbf375a02c4d9506b4dd49d5d2409a" @@ -1278,7 +1278,7 @@ [[projects]] branch = "master" - digest = "1:74a079023eabdf0a56d40a1f151bf8e576b0f3a0d65f00cddc09269bf3f11093" + digest = "1:289de11c47a95fa9dfed15450247e64b41a2a45b3e5969055352823410f1cc39" name = "k8s.io/gengo" packages = [ "args", @@ -1291,7 +1291,7 @@ "types", ] pruneopts = "" - revision = "d8ecbaa43afd0cfa7a3370d5642b814840456eca" + revision = "36b2048a9120ffd30f41305c4a914b0fce7d0cf7" [[projects]] digest = "1:7ce71844fcaaabcbe09a392902edb5790ddca3a7070ae8d20830dc6dbe2751af" diff --git a/flytepropeller/Gopkg.toml b/flytepropeller/Gopkg.toml index 8757c5a82..4462acca4 100644 --- a/flytepropeller/Gopkg.toml +++ b/flytepropeller/Gopkg.toml @@ -57,7 +57,7 @@ required = [ [[constraint]] name = "github.com/lyft/flyteplugins" - version = "^0.2.5" + version = "^0.2.8" [[override]] name = "github.com/lyft/flytestdlib" diff --git a/flytepropeller/pkg/controller/nodes/branch/handler.go b/flytepropeller/pkg/controller/nodes/branch/handler.go index 09c2c0dbc..31816d576 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler.go @@ -113,7 +113,7 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node nodeStatus.SetDataDir(childNodeStatus.GetDataDir()) nodeStatus.SetOutputDir(childNodeStatus.GetOutputDir()) phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{ - OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetDataDir())}, + OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())}, }) return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 73c069db3..571414783 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -217,7 +217,6 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo ), nil } - nodeStatus.IncrementAttempts() // Retrying to clearing all status nCtx.nsm.clearNodeStatus() } @@ -360,7 +359,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork if err := c.finalize(ctx, h, nCtx); err != nil { return executors.NodeStatusUndefined, err } - + nodeStatus.IncrementAttempts() nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying") // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 9f0cadc64..41812d86e 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -742,7 +742,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { {"(retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) { return handler.UnknownTransition, fmt.Errorf("should not be invoked") - }, false, false, core.NodeExecution_RUNNING, 0}, + }, false, false, core.NodeExecution_RUNNING, 1}, {"running->failing", v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, executors.NodePhasePending, func() (handler.Transition, error) { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("code", "reason", nil)), nil diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 680b49df0..c22fa0420 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -59,7 +59,7 @@ func (s *subworkflowHandler) DoInlineSubWorkflow(ctx context.Context, nCtx handl } // TODO optimization, we could just point the outputInfo to the path of the subworkflows output - destinationPath := v1alpha1.GetOutputsFile(parentNodeStatus.GetDataDir()) + destinationPath := v1alpha1.GetOutputsFile(parentNodeStatus.GetOutputDir()) if err := store.CopyRaw(ctx, sourcePath, destinationPath, storage.Options{}); err != nil { errMsg := fmt.Sprintf("Failed to copy subworkflow outputs from [%v] to [%v]", sourcePath, destinationPath) return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(errors.SubWorkflowExecutionFailed, errMsg, nil)), nil @@ -131,7 +131,7 @@ func (s *subworkflowHandler) StartSubWorkflow(ctx context.Context, nCtx handler. return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } - outputDir, err := nCtx.DataStore().ConstructReference(ctx, nodeStatus.GetDataDir(), "0") + outputDir, err := nCtx.DataStore().ConstructReference(ctx, dataDir, "0") if err != nil { err = errors2.Wrapf(err, "Failed to create metadata store key. Error [%v]", err) return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index b07deb2c9..59a03077d 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -175,8 +175,8 @@ func (c *workflowExecutor) handleSucceedingWorkflow(ctx context.Context, w *v1al logger.Infof(ctx, "Workflow completed successfully") endNodeStatus := w.GetNodeExecutionStatus(ctx, v1alpha1.EndNodeID) if endNodeStatus.GetPhase() == v1alpha1.NodePhaseSucceeded { - if endNodeStatus.GetDataDir() != "" { - w.Status.SetOutputReference(v1alpha1.GetOutputsFile(endNodeStatus.GetDataDir())) + if endNodeStatus.GetOutputDir() != "" { + w.Status.SetOutputReference(v1alpha1.GetOutputsFile(endNodeStatus.GetOutputDir())) } } return StatusSuccess