Skip to content

Commit

Permalink
Registerable k8s workflow executor (flyteorg#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Nov 19, 2021
1 parent 528cc61 commit 370365d
Show file tree
Hide file tree
Showing 21 changed files with 1,549 additions and 1,297 deletions.
203 changes: 105 additions & 98 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/flyteorg/flyteadmin/pkg/workflowengine"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"

"github.com/flyteorg/flyteadmin/auth"
Expand Down Expand Up @@ -55,19 +57,20 @@ const childContainerQueueKey = "child_queue"
type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch

type executionSystemMetrics struct {
Scope promutils.Scope
ActiveExecutions prometheus.Gauge
ExecutionsCreated prometheus.Counter
ExecutionsTerminated prometheus.Counter
ExecutionEventsCreated prometheus.Counter
PropellerFailures prometheus.Counter
PublishNotificationError prometheus.Counter
TransformerError prometheus.Counter
UnexpectedDataError prometheus.Counter
SpecSizeBytes prometheus.Summary
ClosureSizeBytes prometheus.Summary
AcceptanceDelay prometheus.Summary
PublishEventError prometheus.Counter
Scope promutils.Scope
ActiveExecutions prometheus.Gauge
ExecutionsCreated prometheus.Counter
ExecutionsTerminated prometheus.Counter
ExecutionEventsCreated prometheus.Counter
PropellerFailures prometheus.Counter
PublishNotificationError prometheus.Counter
TransformerError prometheus.Counter
UnexpectedDataError prometheus.Counter
SpecSizeBytes prometheus.Summary
ClosureSizeBytes prometheus.Summary
AcceptanceDelay prometheus.Summary
PublishEventError prometheus.Counter
TerminateExecutionFailures prometheus.Counter
}

type executionUserMetrics struct {
Expand All @@ -82,7 +85,6 @@ type ExecutionManager struct {
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
storageClient *storage.DataStore
workflowExecutor workflowengineInterfaces.Executor
queueAllocator executions.QueueAllocator
_clock clock.Clock
systemMetrics executionSystemMetrics
Expand Down Expand Up @@ -140,40 +142,23 @@ func validateMapSize(maxEntries int, candidate map[string]string, candidateName
return nil
}

// Labels and annotations defined in the execution spec are preferred over those defined in the
// reference launch plan spec.
func (m *ExecutionManager) addLabelsAndAnnotations(requestSpec *admin.ExecutionSpec,
partiallyPopulatedInputs *workflowengineInterfaces.ExecuteWorkflowInput) error {

var labels map[string]string
if requestSpec.Labels != nil && requestSpec.Labels.Values != nil {
labels = requestSpec.Labels.Values
} else if partiallyPopulatedInputs.Reference.Spec.Labels != nil &&
partiallyPopulatedInputs.Reference.Spec.Labels.Values != nil {
labels = partiallyPopulatedInputs.Reference.Spec.Labels.Values
}
type mapWithValues interface {
GetValues() map[string]string
}

var annotations map[string]string
if requestSpec.Annotations != nil && requestSpec.Annotations.Values != nil {
annotations = requestSpec.Annotations.Values
} else if partiallyPopulatedInputs.Reference.Spec.Annotations != nil &&
partiallyPopulatedInputs.Reference.Spec.Annotations.Values != nil {
annotations = partiallyPopulatedInputs.Reference.Spec.Annotations.Values
func resolveStringMap(preferredValues, defaultValues mapWithValues, valueName string, maxEntries int) (map[string]string, error) {
var response = make(map[string]string)
if preferredValues != nil && preferredValues.GetValues() != nil {
response = preferredValues.GetValues()
} else if defaultValues != nil && defaultValues.GetValues() != nil {
response = defaultValues.GetValues()
}

err := validateMapSize(m.config.RegistrationValidationConfiguration().GetMaxLabelEntries(), labels, "Labels")
if err != nil {
return err
}
err = validateMapSize(
m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries(), annotations, "Annotations")
err := validateMapSize(maxEntries, response, valueName)
if err != nil {
return err
return nil, err
}

partiallyPopulatedInputs.Labels = labels
partiallyPopulatedInputs.Annotations = annotations
return nil
return response, nil
}

func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *core.WorkflowExecutionIdentifier,
Expand Down Expand Up @@ -531,6 +516,8 @@ func (m *ExecutionManager) launchSingleTaskExecution(
Name: name,
}
ctx = getExecutionContext(ctx, &workflowExecutionID)
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain)

requestSpec := request.Spec
if requestSpec.Metadata == nil {
Expand Down Expand Up @@ -563,51 +550,58 @@ func (m *ExecutionManager) launchSingleTaskExecution(
if err != nil {
return nil, nil, err
}
qualityOfService, err := m.qualityOfServiceAllocator.GetQualityOfService(ctx, executions.GetQualityOfServiceInput{
Workflow: &workflow,
LaunchPlan: launchPlan,
ExecutionCreateRequest: &request,
})
if err != nil {
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executionConfig, err := m.getExecutionConfig(ctx, &request, nil)
if err != nil {
return nil, nil, err
}
executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: requestSpec.AuthRole,
QueueingBudget: qualityOfService.QueuingBudget,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
}

var labels map[string]string
if requestSpec.Labels != nil {
executeTaskInputs.Labels = requestSpec.Labels.Values
labels = requestSpec.Labels.Values
}
executeTaskInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeTaskInputs.Labels)
labels, err = m.addProjectLabels(ctx, request.Project, labels)
if err != nil {
return nil, nil, err
}

var annotations map[string]string
if requestSpec.Annotations != nil {
executeTaskInputs.Annotations = requestSpec.Annotations.Values
annotations = requestSpec.Annotations.Values
}

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: request.Inputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
Auth: resolvePermissions(&request, launchPlan),
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: launchPlan.Spec.RawOutputDataConfig,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "")
if err != nil {
return nil, nil, err
}
if overrides != nil {
executeTaskInputs.TaskPluginOverrides = overrides
executionParameters.TaskPluginOverrides = overrides
}
if request.Spec.Metadata != nil && request.Spec.Metadata.ReferenceExecution != nil &&
request.Spec.Metadata.Mode == admin.ExecutionMetadata_RECOVERED {
executionParameters.RecoveryExecution = request.Spec.Metadata.ReferenceExecution
}

execInfo, err := workflowengine.GetRegistry().GetExecutor().Execute(ctx, workflowengineInterfaces.ExecutionData{
Namespace: namespace,
ExecutionID: &workflowExecutionID,
ReferenceWorkflowName: workflow.Id.Name,
ReferenceLaunchPlanName: launchPlan.Id.Name,
WorkflowClosure: workflow.Closure.CompiledWorkflow,
ExecutionParameters: executionParameters,
})

execInfo, err := m.workflowExecutor.ExecuteTask(ctx, executeTaskInputs)
if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
Expand Down Expand Up @@ -759,54 +753,62 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, err
}

qualityOfService, err := m.qualityOfServiceAllocator.GetQualityOfService(ctx, executions.GetQualityOfServiceInput{
Workflow: workflow,
LaunchPlan: launchPlan,
ExecutionCreateRequest: &request,
})
if err != nil {
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan)
if err != nil {
return nil, nil, err
}

// TODO: Reduce CRD size and use offloaded input URI to blob store instead.
executeWorkflowInputs := workflowengineInterfaces.ExecuteWorkflowInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: executionInputs,
Reference: *launchPlan,
AcceptedAt: requestedAt,
QueueingBudget: qualityOfService.QueuingBudget,
ExecutionConfig: executionConfig,
Auth: resolvePermissions(&request, launchPlan),
TaskResources: &platformTaskResources,
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain)

labels, err := resolveStringMap(requestSpec.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
if err != nil {
return nil, nil, err
}
err = m.addLabelsAndAnnotations(request.Spec, &executeWorkflowInputs)
labels, err = m.addProjectLabels(ctx, request.Project, labels)
if err != nil {
return nil, nil, err
}
executeWorkflowInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeWorkflowInputs.Labels)
annotations, err := resolveStringMap(requestSpec.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
if err != nil {
return nil, nil, err
}

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
Auth: resolvePermissions(&request, launchPlan),
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: launchPlan.Spec.RawOutputDataConfig,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
if err != nil {
return nil, nil, err
}
if overrides != nil {
executeWorkflowInputs.TaskPluginOverrides = overrides
executionParameters.TaskPluginOverrides = overrides
}

if request.Spec.Metadata != nil && request.Spec.Metadata.ReferenceExecution != nil &&
request.Spec.Metadata.Mode == admin.ExecutionMetadata_RECOVERED {
executeWorkflowInputs.RecoveryExecution = request.Spec.Metadata.ReferenceExecution
executionParameters.RecoveryExecution = request.Spec.Metadata.ReferenceExecution
}

execInfo, err := m.workflowExecutor.ExecuteWorkflow(ctx, executeWorkflowInputs)
execInfo, err := workflowengine.GetRegistry().GetExecutor().Execute(ctx, workflowengineInterfaces.ExecutionData{
Namespace: namespace,
ExecutionID: &workflowExecutionID,
ReferenceWorkflowName: workflow.Id.Name,
ReferenceLaunchPlanName: launchPlan.Id.Name,
WorkflowClosure: workflow.Closure.CompiledWorkflow,
ExecutionParameters: executionParameters,
})

if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
Expand Down Expand Up @@ -1437,11 +1439,15 @@ func (m *ExecutionManager) TerminateExecution(
return nil, err
}

err = m.workflowExecutor.TerminateWorkflowExecution(ctx, workflowengineInterfaces.TerminateWorkflowInput{
err = workflowengine.GetRegistry().GetExecutor().Abort(ctx, workflowengineInterfaces.AbortData{
Namespace: common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.Id.Project, request.Id.Domain),

ExecutionID: request.Id,
Cluster: executionModel.Cluster,
})
if err != nil {
m.systemMetrics.TerminateExecutionFailures.Inc()
return nil, err
}

Expand Down Expand Up @@ -1483,12 +1489,14 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
"delay in seconds from when an execution was requested to be created and when it actually was"),
PublishEventError: scope.MustNewCounter("publish_event_error",
"overall count of publish event errors when invoking publish()"),
TerminateExecutionFailures: scope.MustNewCounter("execution_termination_failure",
"count of failed workflow executions terminations"),
}
}

func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration,
storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope,
userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface,
storageClient *storage.DataStore, systemScope promutils.Scope, userScope promutils.Scope,
publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface,
eventPublisher notificationInterfaces.Publisher, eventWriter eventWriter.WorkflowExecutionEventWriter) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
Expand All @@ -1509,7 +1517,6 @@ func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInte
db: db,
config: config,
storageClient: storageClient,
workflowExecutor: workflowExecutor,
queueAllocator: queueAllocator,
_clock: clock.New(),
systemMetrics: systemMetrics,
Expand Down
Loading

0 comments on commit 370365d

Please sign in to comment.