Skip to content

Commit

Permalink
TaskTemplate offloading and available to task (flyteorg#166)
Browse files Browse the repository at this point in the history
* [wip]: TaskTemplate offloading and available to task

Signed-off-by: Ketan Umare <[email protected]>

* Fixed tests

Signed-off-by: Ketan Umare <[email protected]>

* IOUtils for working with Automated task template uploading

Signed-off-by: Ketan Umare <[email protected]>

* Introduced the simple reader interface

Signed-off-by: Ketan Umare <[email protected]>

* Helper method for TaskTemplate path

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Mar 26, 2021
1 parent 923466a commit 93b339a
Show file tree
Hide file tree
Showing 24 changed files with 608 additions and 153 deletions.
7 changes: 7 additions & 0 deletions go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (
"github.com/flyteorg/flytestdlib/storage"
)

// An interface to access a remote/sharable location that contains the serialized TaskTemplate
type TaskTemplatePath interface {
// Returns the path
Path(ctx context.Context) (storage.DataReference, error)
}

// An interface to access the TaskInformation
type TaskReader interface {
TaskTemplatePath
// Returns the core TaskTemplate
Read(ctx context.Context) (*core.TaskTemplate, error)
}
Expand Down
54 changes: 48 additions & 6 deletions go/tasks/pluginmachinery/core/mocks/task_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions go/tasks/pluginmachinery/core/mocks/task_template_path.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 49 additions & 32 deletions go/tasks/pluginmachinery/core/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (e ErrorCollection) Error() string {
return sb.String()
}

// The Parameters struct is used by the Templating Engine to replace the templated parameters
type Parameters struct {
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
}

// Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
// Supported templates are:
// - {{ .InputFile }} to receive the input file path. The protocol used will depend on the underlying system
Expand All @@ -44,25 +52,23 @@ func (e ErrorCollection) Error() string {
// NOTE: I wanted to do in-place replacement, until I realized that in-place replacement will alter the definition of the
// graph. This is not desirable, as we may have to retry and in that case the replacement will not work and we want
// to create a new location for outputs
func ReplaceTemplateCommandArgs(ctx context.Context, tExecMeta core.TaskExecutionMetadata, command []string, in io.InputReader,
out io.OutputFilePaths) ([]string, error) {
func Render(ctx context.Context, inputTemplate []string, params Parameters) ([]string, error) {
if len(inputTemplate) == 0 {
return []string{}, nil
}

// TODO: Change GetGeneratedName to follow these conventions
var perRetryUniqueKey = tExecMeta.GetTaskExecutionID().GetGeneratedName()
var perRetryUniqueKey = params.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
perRetryUniqueKey = startsWithAlpha.ReplaceAllString(perRetryUniqueKey, "a")
perRetryUniqueKey = alphaNumericOnly.ReplaceAllString(perRetryUniqueKey, "_")

logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, tExecMeta.GetTaskExecutionID().GetGeneratedName())

if len(command) == 0 {
return []string{}, nil
}
if in == nil || out == nil {
logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, params.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName())
if params.Inputs == nil || params.OutputPath == nil {
return nil, fmt.Errorf("input reader and output path cannot be nil")
}
res := make([]string, 0, len(command))
for _, commandTemplate := range command {
updated, err := replaceTemplateCommandArgs(ctx, perRetryUniqueKey, commandTemplate, in, out)
res := make([]string, 0, len(inputTemplate))
for _, t := range inputTemplate {
updated, err := render(ctx, t, params, perRetryUniqueKey)
if err != nil {
return res, err
}
Expand All @@ -79,30 +85,28 @@ var outputRegex = regexp.MustCompile(`(?i){{\s*[\.$]OutputPrefix\s*}}`)
var inputVarRegex = regexp.MustCompile(`(?i){{\s*[\.$]Inputs\.(?P<input_name>[^}\s]+)\s*}}`)
var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPrefix\s*}}`)
var perRetryUniqueKey = regexp.MustCompile(`(?i){{\s*[\.$]PerRetryUniqueKey\s*}}`)
var taskTemplateRegex = regexp.MustCompile(`(?i){{\s*[\.$]TaskTemplatePath\s*}}`)

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}
func render(ctx context.Context, inputTemplate string, params Parameters, perRetryKey string) (string, error) {

func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, commandTemplate string,
in io.InputReader, out io.OutputFilePaths) (string, error) {

val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, out.GetRawOutputPrefix().String())
val := inputFileRegex.ReplaceAllString(inputTemplate, params.Inputs.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, params.OutputPath.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, params.Inputs.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, params.OutputPath.GetRawOutputPrefix().String())
val = perRetryUniqueKey.ReplaceAllString(val, perRetryKey)

inputs, err := in.Get(ctx)
// For Task template, we will replace only if there is a match. This is because, task template replacement
// may be expensive, as we may offload
if taskTemplateRegex.MatchString(val) {
p, err := params.Task.Path(ctx)
if err != nil {
logger.Debugf(ctx, "Failed to substitute Task Template reference - reason %s", err)
return "", err
}
val = taskTemplateRegex.ReplaceAllString(val, p.String())
}

inputs, err := params.Inputs.Get(ctx)
if err != nil {
return val, errors.Wrapf(err, "unable to read inputs")
}
Expand All @@ -129,6 +133,19 @@ func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, command
return val, nil
}

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}

func serializePrimitive(p *idlCore.Primitive) (string, error) {
switch o := p.Value.(type) {
case *idlCore.Primitive_Integer:
Expand Down
Loading

0 comments on commit 93b339a

Please sign in to comment.