Skip to content

Commit

Permalink
Save resolved overrides alongside ExecutionSpec (#308)
Browse files Browse the repository at this point in the history
* init

Signed-off-by: troychiu <[email protected]>

* nit

Signed-off-by: troychiu <[email protected]>

* task resource

Signed-off-by: troychiu <[email protected]>

* finish all fields & add comments, tests

Signed-off-by: troychiu <[email protected]>

* nit

Signed-off-by: troychiu <[email protected]>

* nit

Signed-off-by: troychiu <[email protected]>

* make generate

Signed-off-by: troychiu <[email protected]>

* update metadata when reassign

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* WIP

Signed-off-by: troychiu <[email protected]>

* clean up

Signed-off-by: troychiu <[email protected]>

* nit

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* fix tests

Signed-off-by: troychiu <[email protected]>

* clean up

Signed-off-by: troychiu <[email protected]>

---------

Signed-off-by: troychiu <[email protected]>
  • Loading branch information
troychiu authored Jun 4, 2024
1 parent 81afefd commit 67cca2f
Show file tree
Hide file tree
Showing 18 changed files with 684 additions and 375 deletions.
45 changes: 42 additions & 3 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,13 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return nil, nil, err
}

resolvedSpec := proto.Clone(request.Spec).(*admin.ExecutionSpec)
resolvedSpec = completeResolvedSpec(resolvedSpec, &executionParameters)

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: requestSpec,
ResolvedSpec: resolvedSpec,
TaskID: taskModel.ID,
WorkflowID: workflowModel.ID,
// The execution is not considered running until the propeller sends a specific event saying so.
Expand Down Expand Up @@ -1115,10 +1119,18 @@ func (m *ExecutionManager) launchExecution(
return nil, nil, err
}

var taskResources workflowengineInterfaces.TaskResources
if requestSpec.TaskResourceAttributes != nil {
taskResources = workflowengineInterfaces.TaskResources{
Defaults: util.FromAdminProtoTaskResourceSpec(ctx, requestSpec.TaskResourceAttributes.Defaults),
Limits: util.FromAdminProtoTaskResourceSpec(ctx, requestSpec.TaskResourceAttributes.Limits),
}
} else {
taskResources = util.GetTaskResources(ctx, workflow.Id, m.resourceManager, m.config.TaskResourceConfiguration())
}
// Dynamically assign task resource defaults.
platformTaskResources := util.GetTaskResources(ctx, workflow.Id, m.resourceManager, m.config.TaskResourceConfiguration())
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
m.setCompiledTaskDefaults(ctx, task, platformTaskResources)
m.setCompiledTaskDefaults(ctx, task, taskResources)
}

// Dynamically assign execution queues.
Expand Down Expand Up @@ -1165,7 +1177,7 @@ func (m *ExecutionManager) launchExecution(
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
TaskResources: &taskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
Expand Down Expand Up @@ -1209,9 +1221,13 @@ func (m *ExecutionManager) launchExecution(
m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs)
}

resolvedSpec := proto.Clone(request.Spec).(*admin.ExecutionSpec)
resolvedSpec = completeResolvedSpec(resolvedSpec, &executionParameters)

createExecModelInput := transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: requestSpec,
ResolvedSpec: resolvedSpec,
LaunchPlanID: launchPlanModel.ID,
WorkflowID: launchPlanModel.WorkflowID,
// The execution is not considered running until the propeller sends a specific event saying so.
Expand Down Expand Up @@ -1257,6 +1273,29 @@ func (m *ExecutionManager) launchExecution(
return ctx, executionModel, nil
}

// This function will store the values for an execution in the execution spec.
// Be sure to update this function when adding new fields to the execution spec.
func completeResolvedSpec(spec *admin.ExecutionSpec, executionParameters *workflowengineInterfaces.ExecutionParameters) *admin.ExecutionSpec {
// We can skip fields that are in input spec, since they are already copied when proto.clone() is called.
// Skipped: LaunchPlan, Metadata, NotificationOverrides, tags
spec.Labels = &admin.Labels{Values: executionParameters.Labels}
spec.Annotations = &admin.Annotations{Values: executionParameters.Annotations}
spec.SecurityContext = executionParameters.ExecutionConfig.SecurityContext
spec.MaxParallelism = executionParameters.ExecutionConfig.MaxParallelism
spec.RawOutputDataConfig = executionParameters.RawOutputDataConfig
spec.ClusterAssignment = executionParameters.ClusterAssignment
spec.Interruptible = executionParameters.ExecutionConfig.Interruptible
spec.OverwriteCache = executionParameters.ExecutionConfig.OverwriteCache
spec.Envs = executionParameters.ExecutionConfig.Envs
spec.ExecutionClusterLabel = executionParameters.ExecutionClusterLabel
spec.ExecutionEnvAssignments = executionParameters.ExecutionConfig.ExecutionEnvAssignments
spec.TaskResourceAttributes = &admin.TaskResourceAttributes{
Defaults: util.ToAdminProtoTaskResourceSpec(&executionParameters.TaskResources.Defaults),
Limits: util.ToAdminProtoTaskResourceSpec(&executionParameters.TaskResources.Limits),
}
return spec
}

// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, artifactTrackers map[string]string, usedArtifactIDs []*core.ArtifactID) {
Expand Down
38 changes: 38 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -79,6 +80,7 @@ var closure = admin.ExecutionClosure{
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: testutils.MockCreatedAtProto,
},
ResolvedSpec: getExpectedSpec(),
}
var closureBytes, _ = proto.Marshal(&closure)

Expand Down Expand Up @@ -159,6 +161,7 @@ func getLegacyClosure() *admin.ExecutionClosure {
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: testutils.MockCreatedAtProto,
},
ResolvedSpec: getExpectedSpec(),
}
}

Expand Down Expand Up @@ -2481,6 +2484,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: testutils.MockCreatedAtProto,
},
ResolvedSpec: getExpectedSpec(),
}
closureBytes, _ := proto.Marshal(&closure)
updateExecutionFunc := func(
Expand Down Expand Up @@ -6297,3 +6301,37 @@ func TestResolveParameterMapArtifacts(t *testing.T) {
assert.Equal(t, 1, len(x))
})
}

func TestCompleteResolvedSpec(t *testing.T) {
existingFields := sets.NewString(
"LaunchPlan",
"Inputs",
"Metadata",
"NotificationOverrides",
"Labels",
"Annotations",
"SecurityContext",
"AuthRole",
"QualityOfService",
"MaxParallelism",
"RawOutputDataConfig",
"ClusterAssignment",
"Interruptible",
"OverwriteCache",
"Envs",
"Tags",
"ExecutionClusterLabel",
"ExecutionEnvAssignments",
"TaskResourceAttributes",
)
specType := reflect.ValueOf(admin.ExecutionSpec{}).Type()
for i := 0; i < specType.NumField(); i++ {
fieldName := specType.Field(i).Name
if fieldName == "state" || fieldName == "sizeCache" || fieldName == "unknownFields" {
continue
}
if !existingFields.Has(fieldName) {
t.Fatalf("This is a warning test. You are adding a new field [%s] to ExecutionSpec.proto, be sure to also modify completeResolvedSpec function in execution_manager.go", fieldName)
}
}
}
24 changes: 4 additions & 20 deletions flyteadmin/pkg/manager/impl/util/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,27 +225,11 @@ func collectGlobalConfiguration(ctx context.Context, config runtimeInterfaces.Co
logger.Debug(ctx, "Collecting global configuration")
// Task resource attributes
taskResourceAttributesConfig := config.TaskResourceConfiguration()
defaultCPU := taskResourceAttributesConfig.GetDefaults().CPU
defaultGPU := taskResourceAttributesConfig.GetDefaults().GPU
defaultMemory := taskResourceAttributesConfig.GetDefaults().Memory
defaultEphemeralStorage := taskResourceAttributesConfig.GetDefaults().EphemeralStorage
limitCPU := taskResourceAttributesConfig.GetLimits().CPU
limitGPU := taskResourceAttributesConfig.GetLimits().GPU
limitMemory := taskResourceAttributesConfig.GetLimits().Memory
limitEphemeralStorage := taskResourceAttributesConfig.GetLimits().EphemeralStorage
defaults := taskResourceAttributesConfig.GetDefaults()
limits := taskResourceAttributesConfig.GetLimits()
taskResourceAttributes := admin.TaskResourceAttributes{
Defaults: &admin.TaskResourceSpec{
Cpu: defaultCPU.String(),
Gpu: defaultGPU.String(),
Memory: defaultMemory.String(),
EphemeralStorage: defaultEphemeralStorage.String(),
},
Limits: &admin.TaskResourceSpec{
Cpu: limitCPU.String(),
Gpu: limitGPU.String(),
Memory: limitMemory.String(),
EphemeralStorage: limitEphemeralStorage.String(),
},
Defaults: ToAdminProtoTaskResourceSpec(&defaults),
Limits: ToAdminProtoTaskResourceSpec(&limits),
}

// Workflow execution configuration
Expand Down
17 changes: 13 additions & 4 deletions flyteadmin/pkg/manager/impl/util/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func GetCompleteTaskResourceRequirements(ctx context.Context, identifier *core.I
}
}

// fromAdminProtoTaskResourceSpec parses the flyteidl `TaskResourceSpec` message into a `TaskResourceSet`.
func fromAdminProtoTaskResourceSpec(ctx context.Context, spec *admin.TaskResourceSpec) runtimeInterfaces.TaskResourceSet {
// FromAdminProtoTaskResourceSpec parses the flyteidl `TaskResourceSpec` message into a `TaskResourceSet`.
func FromAdminProtoTaskResourceSpec(ctx context.Context, spec *admin.TaskResourceSpec) runtimeInterfaces.TaskResourceSet {
result := runtimeInterfaces.TaskResourceSet{}
if len(spec.Cpu) > 0 {
result.CPU = parseQuantityNoError(ctx, "project", "cpu", spec.Cpu)
Expand All @@ -77,6 +77,15 @@ func fromAdminProtoTaskResourceSpec(ctx context.Context, spec *admin.TaskResourc
return result
}

func ToAdminProtoTaskResourceSpec(taskResourceSet *runtimeInterfaces.TaskResourceSet) *admin.TaskResourceSpec {
return &admin.TaskResourceSpec{
Cpu: taskResourceSet.CPU.String(),
Memory: taskResourceSet.Memory.String(),
EphemeralStorage: taskResourceSet.EphemeralStorage.String(),
Gpu: taskResourceSet.GPU.String(),
}
}

// GetTaskResources returns the most specific default and limit task resources for the specified id. This first checks
// if there is a matchable resource(s) defined, and uses the highest priority one, otherwise it falls back to using the
// flyteadmin default configured values.
Expand Down Expand Up @@ -108,8 +117,8 @@ func GetTaskResources(ctx context.Context, id *core.Identifier, resourceManager
logger.Debugf(ctx, "Assigning task requested resources for [%+v]", id)
var taskResourceAttributes = workflowengineInterfaces.TaskResources{}
if resource != nil && resource.Attributes != nil && resource.Attributes.GetTaskResourceAttributes() != nil {
taskResourceAttributes.Defaults = fromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Defaults)
taskResourceAttributes.Limits = fromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Limits)
taskResourceAttributes.Defaults = FromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Defaults)
taskResourceAttributes.Limits = FromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Limits)
} else {
taskResourceAttributes = workflowengineInterfaces.TaskResources{
Defaults: taskResourceConfig.GetDefaults(),
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/util/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestGetTaskResources(t *testing.T) {
}

func TestFromAdminProtoTaskResourceSpec(t *testing.T) {
taskResourceSet := fromAdminProtoTaskResourceSpec(context.TODO(), &admin.TaskResourceSpec{
taskResourceSet := FromAdminProtoTaskResourceSpec(context.TODO(), &admin.TaskResourceSpec{
Cpu: "1",
Memory: "100",
EphemeralStorage: "300",
Expand Down
18 changes: 18 additions & 0 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.
type CreateExecutionModelInput struct {
WorkflowExecutionID core.WorkflowExecutionIdentifier
RequestSpec *admin.ExecutionSpec
ResolvedSpec *admin.ExecutionSpec
LaunchPlanID uint
WorkflowID uint
TaskID uint
Expand Down Expand Up @@ -70,10 +71,15 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
Namespace: input.Namespace,
}
requestSpec.SecurityContext = input.SecurityContext
if input.RequestSpec.Metadata == nil {
input.RequestSpec.Metadata = &admin.ExecutionMetadata{}
}
input.RequestSpec.Metadata.SystemMetadata = requestSpec.Metadata.SystemMetadata
spec, err := proto.Marshal(requestSpec)
if err != nil {
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
}

createdAt := timestamppb.New(input.CreatedAt)
closure := admin.ExecutionClosure{
CreatedAt: createdAt,
Expand All @@ -85,6 +91,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
Principal: requestSpec.Metadata.Principal,
OccurredAt: createdAt,
},
ResolvedSpec: input.ResolvedSpec,
}
if input.Error != nil {
closure.Phase = core.WorkflowExecution_FAILED
Expand Down Expand Up @@ -220,6 +227,17 @@ func UpdateExecutionModelState(
if err := reassignCluster(ctx, request.Event.ProducerId, request.Event.ExecutionId, execution); err != nil {
return err
}
// also update the resolved spec
if executionClosure.ResolvedSpec == nil {
executionClosure.ResolvedSpec = &admin.ExecutionSpec{}
}
if executionClosure.ResolvedSpec.Metadata == nil {
executionClosure.ResolvedSpec.Metadata = &admin.ExecutionMetadata{}
}
if executionClosure.ResolvedSpec.Metadata.SystemMetadata == nil {
executionClosure.ResolvedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{}
}
executionClosure.ResolvedSpec.Metadata.SystemMetadata.ExecutionCluster = request.Event.ProducerId
} else if execution.Cluster != request.Event.ProducerId {
errorMsg := fmt.Sprintf("Cannot accept events for running/terminated execution [%v] from cluster [%s],"+
"expected events to originate from [%s]",
Expand Down
10 changes: 10 additions & 0 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestCreateExecutionModel(t *testing.T) {
Org: testOrg,
},
RequestSpec: execRequest.Spec,
ResolvedSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
CreatedAt: createdAt,
Expand Down Expand Up @@ -129,6 +130,7 @@ func TestCreateExecutionModel(t *testing.T) {
OccurredAt: expectedCreatedAt,
Principal: principal,
},
ResolvedSpec: execRequest.Spec,
})
assert.Equal(t, expectedClosure, execution.Closure)
})
Expand All @@ -142,6 +144,7 @@ func TestCreateExecutionModel(t *testing.T) {
Org: testOrg,
},
RequestSpec: execRequest.Spec,
ResolvedSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
CreatedAt: createdAt,
Expand Down Expand Up @@ -197,6 +200,7 @@ func TestCreateExecutionModel(t *testing.T) {
OccurredAt: expectedCreatedAt,
Principal: principal,
},
ResolvedSpec: execRequest.Spec,
})
assert.Equal(t, string(expectedClosure), string(execution.Closure))
})
Expand All @@ -210,6 +214,7 @@ func TestCreateExecutionModel(t *testing.T) {
Org: testOrg,
},
RequestSpec: execRequest.Spec,
ResolvedSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
CreatedAt: createdAt,
Expand Down Expand Up @@ -265,6 +270,7 @@ func TestCreateExecutionModel(t *testing.T) {
OccurredAt: expectedCreatedAt,
Principal: principal,
},
ResolvedSpec: execRequest.Spec,
})
assert.Equal(t, expectedClosure, execution.Closure)
})
Expand All @@ -278,6 +284,7 @@ func TestCreateExecutionModel(t *testing.T) {
Org: testOrg,
},
RequestSpec: execRequest.Spec,
ResolvedSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
CreatedAt: createdAt,
Expand Down Expand Up @@ -333,6 +340,7 @@ func TestCreateExecutionModel(t *testing.T) {
OccurredAt: expectedCreatedAt,
Principal: principal,
},
ResolvedSpec: execRequest.Spec,
})
assert.Equal(t, expectedClosure, execution.Closure)
})
Expand Down Expand Up @@ -723,6 +731,7 @@ func TestFromExecutionModel(t *testing.T) {
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: createdAtProto,
},
ResolvedSpec: spec,
}
closureBytes, _ := proto.Marshal(&closure)
stateInt := int32(admin.ExecutionState_EXECUTION_ACTIVE)
Expand Down Expand Up @@ -887,6 +896,7 @@ func TestFromExecutionModels(t *testing.T) {
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: createdAtProto,
},
ResolvedSpec: spec,
}
closureBytes, _ := proto.Marshal(&closure)
stateInt := int32(admin.ExecutionState_EXECUTION_ACTIVE)
Expand Down
Loading

0 comments on commit 67cca2f

Please sign in to comment.