Skip to content

Commit

Permalink
Enable EventVersion support in Admin (flyteorg#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
anandswaminathan authored Nov 3, 2020
1 parent 6b5f59d commit 3104864
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
applicationConfiguration.RoleNameKey,
execCluster,
adminScope.NewSubScope("executor").NewSubScope("flytepropeller"),
configuration.NamespaceMappingConfiguration())
configuration.NamespaceMappingConfiguration(), applicationConfiguration.EventVersion)
logger.Info(context.Background(), "Successfully created a workflow executor engine")
dataStorageClient, err := storage.NewDataStore(storeConfig, adminScope.NewSubScope("storage"))
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type ApplicationConfig struct {
// This defines the nested path on the configured external storage provider where workflow closures are remotely
// offloaded.
MetadataStoragePrefix []string `json:"metadataStoragePrefix"`
// Event version to be used for Flyte workflows
EventVersion int `json:"eventVersion"`
}

// This section holds common config for AWS
Expand Down
8 changes: 7 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/propeller_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type FlytePropeller struct {
roleNameKey string
metrics propellerMetrics
config runtimeInterfaces.NamespaceMappingConfiguration
eventVersion v1alpha1.EventVersion
}

type FlyteWorkflowBuilder struct{}
Expand Down Expand Up @@ -138,6 +139,10 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E
flyteWf.Labels = labels
annotations := addMapValues(input.Annotations, flyteWf.Annotations)
flyteWf.Annotations = annotations
if flyteWf.WorkflowMeta == nil {
flyteWf.WorkflowMeta = &v1alpha1.WorkflowMeta{}
}
flyteWf.WorkflowMeta.EventVersion = c.eventVersion
addExecutionOverrides(input.TaskPluginOverrides, flyteWf)

if input.Reference.Spec.RawOutputDataConfig != nil {
Expand Down Expand Up @@ -301,13 +306,14 @@ func newPropellerMetrics(scope promutils.Scope) propellerMetrics {
}

func NewFlytePropeller(roleNameKey string, executionCluster interfaces2.ClusterInterface,
scope promutils.Scope, configuration runtimeInterfaces.NamespaceMappingConfiguration) interfaces.Executor {
scope promutils.Scope, configuration runtimeInterfaces.NamespaceMappingConfiguration, eventVersion int) interfaces.Executor {

return &FlytePropeller{
executionCluster: executionCluster,
builder: &FlyteWorkflowBuilder{},
roleNameKey: roleNameKey,
metrics: newPropellerMetrics(scope),
config: configuration,
eventVersion: v1alpha1.EventVersion(eventVersion),
}
}

0 comments on commit 3104864

Please sign in to comment.