Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

TaskTemplate offloading and available to task #166

Merged
merged 7 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (
"github.com/flyteorg/flytestdlib/storage"
)

// An interface to access a remote/sharable location that contains the serialized TaskTemplate
type TaskTemplatePath interface {
// Returns the path
Path(ctx context.Context) (storage.DataReference, error)
}

// An interface to access the TaskInformation
type TaskReader interface {
TaskTemplatePath
// Returns the core TaskTemplate
Read(ctx context.Context) (*core.TaskTemplate, error)
}
Expand Down
54 changes: 48 additions & 6 deletions go/tasks/pluginmachinery/core/mocks/task_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions go/tasks/pluginmachinery/core/mocks/task_template_path.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 49 additions & 32 deletions go/tasks/pluginmachinery/core/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (e ErrorCollection) Error() string {
return sb.String()
}

// The Parameters struct is used by the Templating Engine to replace the templated parameters
type Parameters struct {
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
}

// Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
// Supported templates are:
// - {{ .InputFile }} to receive the input file path. The protocol used will depend on the underlying system
Expand All @@ -44,25 +52,23 @@ func (e ErrorCollection) Error() string {
// NOTE: I wanted to do in-place replacement, until I realized that in-place replacement will alter the definition of the
// graph. This is not desirable, as we may have to retry and in that case the replacement will not work and we want
// to create a new location for outputs
func ReplaceTemplateCommandArgs(ctx context.Context, tExecMeta core.TaskExecutionMetadata, command []string, in io.InputReader,
out io.OutputFilePaths) ([]string, error) {
func Render(ctx context.Context, inputTemplate []string, params Parameters) ([]string, error) {
if len(inputTemplate) == 0 {
return []string{}, nil
}

// TODO: Change GetGeneratedName to follow these conventions
var perRetryUniqueKey = tExecMeta.GetTaskExecutionID().GetGeneratedName()
var perRetryUniqueKey = params.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
perRetryUniqueKey = startsWithAlpha.ReplaceAllString(perRetryUniqueKey, "a")
perRetryUniqueKey = alphaNumericOnly.ReplaceAllString(perRetryUniqueKey, "_")

logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, tExecMeta.GetTaskExecutionID().GetGeneratedName())

if len(command) == 0 {
return []string{}, nil
}
if in == nil || out == nil {
logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, params.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName())
if params.Inputs == nil || params.OutputPath == nil {
return nil, fmt.Errorf("input reader and output path cannot be nil")
}
res := make([]string, 0, len(command))
for _, commandTemplate := range command {
updated, err := replaceTemplateCommandArgs(ctx, perRetryUniqueKey, commandTemplate, in, out)
res := make([]string, 0, len(inputTemplate))
for _, t := range inputTemplate {
updated, err := render(ctx, t, params, perRetryUniqueKey)
if err != nil {
return res, err
}
Expand All @@ -79,30 +85,28 @@ var outputRegex = regexp.MustCompile(`(?i){{\s*[\.$]OutputPrefix\s*}}`)
var inputVarRegex = regexp.MustCompile(`(?i){{\s*[\.$]Inputs\.(?P<input_name>[^}\s]+)\s*}}`)
var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPrefix\s*}}`)
var perRetryUniqueKey = regexp.MustCompile(`(?i){{\s*[\.$]PerRetryUniqueKey\s*}}`)
var taskTemplateRegex = regexp.MustCompile(`(?i){{\s*[\.$]TaskTemplatePath\s*}}`)

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}
func render(ctx context.Context, inputTemplate string, params Parameters, perRetryKey string) (string, error) {

func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, commandTemplate string,
in io.InputReader, out io.OutputFilePaths) (string, error) {

val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, out.GetRawOutputPrefix().String())
val := inputFileRegex.ReplaceAllString(inputTemplate, params.Inputs.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, params.OutputPath.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, params.Inputs.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, params.OutputPath.GetRawOutputPrefix().String())
val = perRetryUniqueKey.ReplaceAllString(val, perRetryKey)

inputs, err := in.Get(ctx)
// For Task template, we will replace only if there is a match. This is because, task template replacement
// may be expensive, as we may offload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we always offload? r.store.WriteProtobuf? or is store sometimes local?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we offload only when ttt params.Task.Path is invoked! which is only if the taskTemplateRegex match happens.
Again this is done so that its transparent to the templating method?

if taskTemplateRegex.MatchString(val) {
p, err := params.Task.Path(ctx)
if err != nil {
logger.Debugf(ctx, "Failed to substitute Task Template reference - reason %s", err)
return "", err
}
val = taskTemplateRegex.ReplaceAllString(val, p.String())
}

inputs, err := params.Inputs.Get(ctx)
if err != nil {
return val, errors.Wrapf(err, "unable to read inputs")
}
Expand All @@ -129,6 +133,19 @@ func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, command
return val, nil
}

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}

func serializePrimitive(p *idlCore.Primitive) (string, error) {
switch o := p.Value.(type) {
case *idlCore.Primitive_Integer:
Expand Down
Loading