diff --git a/go.mod b/go.mod index 6b67f89a5..8a7cad375 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.3.7 + github.com/flyteorg/flyteidl v1.3.9 github.com/flyteorg/flyteplugins v1.0.37 github.com/flyteorg/flytestdlib v1.0.15 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index 9fee2761b..1a28b47ac 100644 --- a/go.sum +++ b/go.sum @@ -260,8 +260,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.7 h1:MA7kOqMr/TmPlYPvJZwfsl+CYneuDOJ+kEKx2DocLhE= -github.com/flyteorg/flyteidl v1.3.7/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk= +github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flyteplugins v1.0.37 h1:TRgsZaGn5JhHBOsfLVU1kNg+TPFiuxbItC1wFA4nmgU= github.com/flyteorg/flyteplugins v1.0.37/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 2c7803b5a..ef1fc51c3 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -30,6 +30,7 @@ type dynamicWorkflowContext struct { subWorkflowClosure *core.CompiledWorkflowClosure nodeLookup executors.NodeLookup isDynamic bool + dynamicJobSpecURI string } const dynamicWfNameTemplate = "dynamic_%s" @@ -183,6 +184,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C subWorkflowClosure: workflowCacheContents.CompiledWorkflow, execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()), nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus), + dynamicJobSpecURI: string(f.GetLoc()), }, nil } } @@ -215,6 +217,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C subWorkflowClosure: closure, execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()), nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus), + dynamicJobSpecURI: string(f.GetLoc()), }, nil } diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 3324c539b..1d35fb502 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -111,8 +111,9 @@ func (d dynamicNodeTaskNodeHandler) produceDynamicWorkflow(ctx context.Context, taskNodeInfoMetadata := &event.TaskNodeMetadata{} if dCtx.subWorkflowClosure != nil && dCtx.subWorkflowClosure.Primary != nil && dCtx.subWorkflowClosure.Primary.Template != nil { taskNodeInfoMetadata.DynamicWorkflow = &event.DynamicWorkflowNodeMetadata{ - Id: dCtx.subWorkflowClosure.Primary.Template.Id, - CompiledWorkflow: dCtx.subWorkflowClosure, + Id: dCtx.subWorkflowClosure.Primary.Template.Id, + CompiledWorkflow: dCtx.subWorkflowClosure, + DynamicJobSpecUri: dCtx.dynamicJobSpecURI, } } diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 0b8bd52bf..7e42e0ee3 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -21,40 +21,39 @@ import ( "fmt" "time" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - errors2 "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" + "github.com/flyteorg/flytepropeller/events" eventsErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task" + + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flytestdlib/contextutils" + errors2 "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/ptypes" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/flyteorg/flytepropeller/pkg/controller/config" - - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - "github.com/flyteorg/flytepropeller/pkg/controller/executors" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) type nodeMetrics struct { @@ -156,6 +155,35 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve return err } +func (c *nodeExecutor) recoverInputs(ctx context.Context, nCtx handler.NodeExecutionContext, + recovered *admin.NodeExecution, recoveredData *admin.NodeExecutionGetDataResponse) (*core.LiteralMap, error) { + + nodeInputs := recoveredData.FullInputs + if nodeInputs != nil { + if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, nodeInputs); err != nil { + c.metrics.InputsWriteFailure.Inc(ctx) + logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath()) + return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath()) + } + } else if len(recovered.InputUri) > 0 { + // If the inputs are too large they won't be returned inline in the RecoverData call. We must fetch them before copying them. + nodeInputs = &core.LiteralMap{} + if recoveredData.FullInputs == nil { + if err := c.store.ReadProtobuf(ctx, storage.DataReference(recovered.InputUri), nodeInputs); err != nil { + return nil, errors.Wrapf(errors.InputsNotFoundError, nCtx.NodeID(), err, "failed to read data from dataDir [%v].", recovered.InputUri) + } + } + + if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, nodeInputs); err != nil { + c.metrics.InputsWriteFailure.Inc(ctx) + logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath()) + return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath()) + } + } + + return nodeInputs, nil +} + func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.PhaseInfo, error) { fullyQualifiedNodeID := nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 { @@ -205,6 +233,44 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe nCtx.NodeExecutionMetadata().GetNodeExecutionID(), err) } } + + // if this node is a dynamic task we attempt to recover the compiled workflow from instances where the parent + // task succeeded but the dynamic task did not complete. this is important to ensure correctness since node ids + // within the compiled closure may not be generated deterministically. + if recovered.Metadata != nil && recovered.Metadata.IsDynamic && len(recovered.Closure.DynamicJobSpecUri) > 0 { + // recover node inputs + recoveredData, err := c.recoveryClient.RecoverNodeExecutionData(ctx, + nCtx.ExecutionContext().GetExecutionConfig().RecoveryExecution.WorkflowExecutionIdentifier, fullyQualifiedNodeID) + if err != nil || recoveredData == nil { + return handler.PhaseInfoUndefined, nil + } + + if _, err := c.recoverInputs(ctx, nCtx, recovered, recoveredData); err != nil { + return handler.PhaseInfoUndefined, err + } + + // copy previous DynamicJobSpec file + f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore()) + if err != nil { + return handler.PhaseInfoUndefined, err + } + + dynamicJobSpecReference := storage.DataReference(recovered.Closure.DynamicJobSpecUri) + if err := nCtx.DataStore().CopyRaw(ctx, dynamicJobSpecReference, f.GetLoc(), storage.Options{}); err != nil { + return handler.PhaseInfoUndefined, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, + "failed to store dynamic job spec for node. source file [%s] destination file [%s]", dynamicJobSpecReference, f.GetLoc()) + } + + // transition node phase to 'Running' and dynamic task phase to 'DynamicNodePhaseParentFinalized' + state := nCtx.NodeStateReader().GetDynamicNodeState() + state.Phase = v1alpha1.DynamicNodePhaseParentFinalized + if err := nCtx.NodeStateWriter().PutDynamicNodeState(state); err != nil { + return handler.PhaseInfoUndefined, errors.Wrapf(errors.UnknownError, nCtx.NodeID(), err, "failed to store dynamic node state") + } + + return handler.PhaseInfoRunning(&handler.ExecutionInfo{}), nil + } + logger.Debugf(ctx, "Node [%+v] phase [%v] is not recoverable", nCtx.NodeExecutionMetadata().GetNodeExecutionID(), recovered.Closure.Phase) return handler.PhaseInfoUndefined, nil } @@ -222,31 +288,13 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe logger.Warnf(ctx, "call to attemptRecovery node [%+v] data returned no error but also no data", nCtx.NodeExecutionMetadata().GetNodeExecutionID()) return handler.PhaseInfoUndefined, nil } - nodeInputs := recoveredData.GetFullInputs() - // Copy inputs to this node's expected location - if nodeInputs != nil { - if err = c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, recoveredData.FullInputs); err != nil { - c.metrics.InputsWriteFailure.Inc(ctx) - logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath()) - return handler.PhaseInfoUndefined, errors.Wrapf( - errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath()) - } - } else if len(recovered.InputUri) > 0 { - // If the inputs are too large they won't be returned inline in the RecoverData call. We must fetch them before copying them. - nodeInputs = &core.LiteralMap{} - if recoveredData.FullInputs == nil { - if err := c.store.ReadProtobuf(ctx, storage.DataReference(recovered.InputUri), nodeInputs); err != nil { - return handler.PhaseInfoUndefined, errors.Wrapf(errors.InputsNotFoundError, nCtx.NodeID(), err, "failed to read data from dataDir [%v].", recovered.InputUri) - } - } - if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, nodeInputs); err != nil { - c.metrics.InputsWriteFailure.Inc(ctx) - logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath()) - return handler.PhaseInfoUndefined, errors.Wrapf( - errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath()) - } + // Copy inputs to this node's expected location + nodeInputs, err := c.recoverInputs(ctx, nCtx, recovered, recoveredData) + if err != nil { + return handler.PhaseInfoUndefined, err } + // Similarly, copy outputs' reference so := storage.Options{} var outputs = &core.LiteralMap{} @@ -334,7 +382,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur if !node.IsStartNode() { if nCtx.ExecutionContext().GetExecutionConfig().RecoveryExecution.WorkflowExecutionIdentifier != nil { phaseInfo, err := c.attemptRecovery(ctx, nCtx) - if err != nil || phaseInfo.GetPhase() == handler.EPhaseRecovered { + if err != nil || phaseInfo.GetPhase() != handler.EPhaseUndefined { return phaseInfo, err } } @@ -676,7 +724,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx *nodeExe // NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state // Attempt is used throughout the system to determine the idempotent resource version. nodeStatus.IncrementAttempts() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying", nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", nil) // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() nodeStatus.ClearTaskStatus() @@ -710,7 +758,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur if err := c.abort(ctx, h, nCtx, "node failing"); err != nil { return executors.NodeStatusUndefined, err } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) if nCtx.md.IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -725,7 +773,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur } nodeStatus.ClearSubNodeStatus() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) if nCtx.md.IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -738,7 +786,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur if err := c.finalize(ctx, h, nCtx); err != nil { return executors.NodeStatusUndefined, err } - t := v1.Now() + t := metav1.Now() started := nodeStatus.GetStartedAt() if started == nil { diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index 35e6c5ce6..a707b4dfc 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -2151,6 +2151,97 @@ func TestRecover(t *testing.T) { assert.NoError(t, err) assert.Equal(t, phaseInfo.GetPhase(), handler.EPhaseRecovered) }) + t.Run("recover partially completed dynamic task", func(t *testing.T) { + srcDynamicJobSpecURI := "src/foo/bar" + dstDynamicJobSpecURI := "dst/foo/bar" + + // initialize node execution context + nCtx := &nodeHandlerMocks.NodeExecutionContext{} + nCtx.OnExecutionContext().Return(execContext) + nCtx.OnNodeExecutionMetadata().Return(nm) + nCtx.OnInputReader().Return(ir) + nCtx.OnNodeStatus().Return(ns) + + mockPBStore := &storageMocks.ComposedProtobufStore{} + mockPBStore.On("CopyRaw", mock.Anything, storage.DataReference(srcDynamicJobSpecURI), storage.DataReference(dstDynamicJobSpecURI), mock.Anything).Return(nil) + mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool { + return reference.String() == inputsPath || reference.String() == outputsPath + }), mock.Anything, + mock.Anything).Return(nil) + mockReferenceConstructor := storageMocks.ReferenceConstructor{} + mockReferenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("out"), "futures.pb").Return( + storage.DataReference(dstDynamicJobSpecURI), nil) + mockReferenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("out"), "futures_compiled.pb").Return( + storage.DataReference("out/futures_compiled.pb"), nil) + mockReferenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("out"), "dynamic_compiled.pb").Return( + storage.DataReference("out/dynamic_compiled.pb"), nil) + storageClient := &storage.DataStore{ + ComposedProtobufStore: mockPBStore, + ReferenceConstructor: &mockReferenceConstructor, + } + + nCtx.OnDataStore().Return(storageClient) + + reader := &nodeHandlerMocks.NodeStateReader{} + reader.OnGetDynamicNodeState().Return(handler.DynamicNodeState{}) + nCtx.OnNodeStateReader().Return(reader) + + writer := &nodeHandlerMocks.NodeStateWriter{} + writer.OnPutDynamicNodeStateMatch(mock.Anything).Run(func(args mock.Arguments) { + state := args.Get(0).(handler.DynamicNodeState) + assert.Equal(t, v1alpha1.DynamicNodePhaseParentFinalized, state.Phase) + }).Return(nil) + nCtx.OnNodeStateWriter().Return(writer) + + // initialize node executor + recoveryClient := &recoveryMocks.Client{} + recoveryClient.On("RecoverNodeExecution", mock.Anything, recoveryID, nodeID).Return( + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + Phase: core.NodeExecution_FAILED, + DynamicJobSpecUri: srcDynamicJobSpecURI, + }, + Metadata: &admin.NodeExecutionMetaData{ + IsDynamic: true, + }, + }, nil) + + dynamicWorkflow := &admin.DynamicWorkflowNodeMetadata{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_WORKFLOW, + Project: "p", + Domain: "d", + Name: "n", + Version: "abc123", + }, + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Metadata: &core.WorkflowMetadata{ + OnFailure: core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE, + }, + }, + }, + }, + } + + recoveryClient.On("RecoverNodeExecutionData", mock.Anything, recoveryID, nodeID).Return( + &admin.NodeExecutionGetDataResponse{ + FullInputs: fullInputs, + FullOutputs: fullOutputs, + DynamicWorkflow: dynamicWorkflow, + }, nil) + + executor := nodeExecutor{ + recoveryClient: recoveryClient, + store: storageClient, + eventConfig: eventConfig, + } + + phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx) + assert.NoError(t, err) + assert.Equal(t, phaseInfo.GetPhase(), handler.EPhaseRunning) + }) t.Run("recover cached, dynamic task node successfully", func(t *testing.T) { recoveryClient := &recoveryMocks.Client{} recoveryClient.On("RecoverNodeExecution", mock.Anything, recoveryID, nodeID).Return( diff --git a/pkg/controller/nodes/task/future_file_reader.go b/pkg/controller/nodes/task/future_file_reader.go index b6ce7588f..c05fdea09 100644 --- a/pkg/controller/nodes/task/future_file_reader.go +++ b/pkg/controller/nodes/task/future_file_reader.go @@ -26,6 +26,10 @@ type FutureFileReader struct { store *storage.DataStore } +func (f FutureFileReader) GetLoc() storage.DataReference { + return f.loc +} + func (f FutureFileReader) Exists(ctx context.Context) (bool, error) { metadata, err := f.store.Head(ctx, f.loc) // If no futures file produced, then declare success and return.