diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 05eafbd129..3762338688 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -89,7 +89,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService { applicationConfiguration.RoleNameKey, execCluster, adminScope.NewSubScope("executor").NewSubScope("flytepropeller"), - configuration.NamespaceMappingConfiguration()) + configuration.NamespaceMappingConfiguration(), applicationConfiguration.EventVersion) logger.Info(context.Background(), "Successfully created a workflow executor engine") dataStorageClient, err := storage.NewDataStore(storeConfig, adminScope.NewSubScope("storage")) if err != nil { diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 649b22e0dd..d00878522b 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -48,6 +48,8 @@ type ApplicationConfig struct { // This defines the nested path on the configured external storage provider where workflow closures are remotely // offloaded. MetadataStoragePrefix []string `json:"metadataStoragePrefix"` + // Event version to be used for Flyte workflows + EventVersion int `json:"eventVersion"` } // This section holds common config for AWS diff --git a/pkg/workflowengine/impl/propeller_executor.go b/pkg/workflowengine/impl/propeller_executor.go index f952293114..4d41ae4c8a 100644 --- a/pkg/workflowengine/impl/propeller_executor.go +++ b/pkg/workflowengine/impl/propeller_executor.go @@ -44,6 +44,7 @@ type FlytePropeller struct { roleNameKey string metrics propellerMetrics config runtimeInterfaces.NamespaceMappingConfiguration + eventVersion v1alpha1.EventVersion } type FlyteWorkflowBuilder struct{} @@ -138,6 +139,10 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E flyteWf.Labels = labels annotations := addMapValues(input.Annotations, flyteWf.Annotations) flyteWf.Annotations = annotations + if flyteWf.WorkflowMeta == nil { + flyteWf.WorkflowMeta = &v1alpha1.WorkflowMeta{} + } + flyteWf.WorkflowMeta.EventVersion = c.eventVersion addExecutionOverrides(input.TaskPluginOverrides, flyteWf) if input.Reference.Spec.RawOutputDataConfig != nil { @@ -301,7 +306,7 @@ func newPropellerMetrics(scope promutils.Scope) propellerMetrics { } func NewFlytePropeller(roleNameKey string, executionCluster interfaces2.ClusterInterface, - scope promutils.Scope, configuration runtimeInterfaces.NamespaceMappingConfiguration) interfaces.Executor { + scope promutils.Scope, configuration runtimeInterfaces.NamespaceMappingConfiguration, eventVersion int) interfaces.Executor { return &FlytePropeller{ executionCluster: executionCluster, @@ -309,5 +314,6 @@ func NewFlytePropeller(roleNameKey string, executionCluster interfaces2.ClusterI roleNameKey: roleNameKey, metrics: newPropellerMetrics(scope), config: configuration, + eventVersion: v1alpha1.EventVersion(eventVersion), } }