diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index ec972d9fde..5b764059db 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -73,6 +73,8 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No RawOutputDataConfig: nCtx.ExecutionContext().GetRawOutputDataConfig().RawOutputDataConfig, Labels: nCtx.ExecutionContext().GetLabels(), Annotations: nCtx.ExecutionContext().GetAnnotations(), + Interruptible: nCtx.ExecutionContext().GetExecutionConfig().Interruptible, + OverwriteCache: nCtx.ExecutionContext().GetExecutionConfig().OverwriteCache, } if nCtx.ExecutionContext().GetExecutionConfig().RecoveryExecution.WorkflowExecutionIdentifier != nil { diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 0329f3aefe..60499f230b 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -6,23 +6,25 @@ import ( "fmt" "time" - evtErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flytestdlib/cache" - "golang.org/x/time/rate" - "k8s.io/client-go/util/workqueue" - stdErr "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" + evtErr "github.com/flyteorg/flytepropeller/events/errors" + + "github.com/golang/protobuf/ptypes/wrappers" + + "golang.org/x/time/rate" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "k8s.io/client-go/util/workqueue" ) var isRecovery = true @@ -93,6 +95,13 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo } } + var interruptible *wrappers.BoolValue + if launchCtx.Interruptible != nil { + interruptible = &wrappers.BoolValue{ + Value: *launchCtx.Interruptible, + } + } + req := &admin.ExecutionCreateRequest{ Project: executionID.Project, Domain: executionID.Domain, @@ -111,6 +120,8 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo SecurityContext: &launchCtx.SecurityContext, MaxParallelism: int32(launchCtx.MaxParallelism), RawOutputDataConfig: launchCtx.RawOutputDataConfig, + Interruptible: interruptible, + OverwriteCache: launchCtx.OverwriteCache, }, } diff --git a/pkg/controller/nodes/subworkflow/launchplan/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan/launchplan.go index 7d31b6f92b..7d6588ca3b 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan/launchplan.go @@ -28,6 +28,8 @@ type LaunchContext struct { RawOutputDataConfig *admin.RawOutputDataConfig Annotations map[string]string Labels map[string]string + Interruptible *bool + OverwriteCache bool } // Executor interface to be implemented by the remote system that can allow workflow launching capabilities