Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
TaskTemplate offloading and available to task (#166)
Browse files Browse the repository at this point in the history
* [wip]: TaskTemplate offloading and available to task

Signed-off-by: Ketan Umare <[email protected]>

* Fixed tests

Signed-off-by: Ketan Umare <[email protected]>

* IOUtils for working with Automated task template uploading

Signed-off-by: Ketan Umare <[email protected]>

* Introduced the simple reader interface

Signed-off-by: Ketan Umare <[email protected]>

* Helper method for TaskTemplate path

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Mar 26, 2021
1 parent 98bfa7a commit f406cbb
Show file tree
Hide file tree
Showing 24 changed files with 608 additions and 153 deletions.
7 changes: 7 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (
"github.com/flyteorg/flytestdlib/storage"
)

// An interface to access a remote/sharable location that contains the serialized TaskTemplate
type TaskTemplatePath interface {
// Returns the path
Path(ctx context.Context) (storage.DataReference, error)
}

// An interface to access the TaskInformation
type TaskReader interface {
TaskTemplatePath
// Returns the core TaskTemplate
Read(ctx context.Context) (*core.TaskTemplate, error)
}
Expand Down
54 changes: 48 additions & 6 deletions flyteplugins/go/tasks/pluginmachinery/core/mocks/task_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 49 additions & 32 deletions flyteplugins/go/tasks/pluginmachinery/core/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (e ErrorCollection) Error() string {
return sb.String()
}

// The Parameters struct is used by the Templating Engine to replace the templated parameters
type Parameters struct {
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
}

// Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
// Supported templates are:
// - {{ .InputFile }} to receive the input file path. The protocol used will depend on the underlying system
Expand All @@ -44,25 +52,23 @@ func (e ErrorCollection) Error() string {
// NOTE: I wanted to do in-place replacement, until I realized that in-place replacement will alter the definition of the
// graph. This is not desirable, as we may have to retry and in that case the replacement will not work and we want
// to create a new location for outputs
func ReplaceTemplateCommandArgs(ctx context.Context, tExecMeta core.TaskExecutionMetadata, command []string, in io.InputReader,
out io.OutputFilePaths) ([]string, error) {
func Render(ctx context.Context, inputTemplate []string, params Parameters) ([]string, error) {
if len(inputTemplate) == 0 {
return []string{}, nil
}

// TODO: Change GetGeneratedName to follow these conventions
var perRetryUniqueKey = tExecMeta.GetTaskExecutionID().GetGeneratedName()
var perRetryUniqueKey = params.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
perRetryUniqueKey = startsWithAlpha.ReplaceAllString(perRetryUniqueKey, "a")
perRetryUniqueKey = alphaNumericOnly.ReplaceAllString(perRetryUniqueKey, "_")

logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, tExecMeta.GetTaskExecutionID().GetGeneratedName())

if len(command) == 0 {
return []string{}, nil
}
if in == nil || out == nil {
logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, params.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName())
if params.Inputs == nil || params.OutputPath == nil {
return nil, fmt.Errorf("input reader and output path cannot be nil")
}
res := make([]string, 0, len(command))
for _, commandTemplate := range command {
updated, err := replaceTemplateCommandArgs(ctx, perRetryUniqueKey, commandTemplate, in, out)
res := make([]string, 0, len(inputTemplate))
for _, t := range inputTemplate {
updated, err := render(ctx, t, params, perRetryUniqueKey)
if err != nil {
return res, err
}
Expand All @@ -79,30 +85,28 @@ var outputRegex = regexp.MustCompile(`(?i){{\s*[\.$]OutputPrefix\s*}}`)
var inputVarRegex = regexp.MustCompile(`(?i){{\s*[\.$]Inputs\.(?P<input_name>[^}\s]+)\s*}}`)
var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPrefix\s*}}`)
var perRetryUniqueKey = regexp.MustCompile(`(?i){{\s*[\.$]PerRetryUniqueKey\s*}}`)
var taskTemplateRegex = regexp.MustCompile(`(?i){{\s*[\.$]TaskTemplatePath\s*}}`)

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}
func render(ctx context.Context, inputTemplate string, params Parameters, perRetryKey string) (string, error) {

func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, commandTemplate string,
in io.InputReader, out io.OutputFilePaths) (string, error) {

val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, out.GetRawOutputPrefix().String())
val := inputFileRegex.ReplaceAllString(inputTemplate, params.Inputs.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, params.OutputPath.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, params.Inputs.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, params.OutputPath.GetRawOutputPrefix().String())
val = perRetryUniqueKey.ReplaceAllString(val, perRetryKey)

inputs, err := in.Get(ctx)
// For Task template, we will replace only if there is a match. This is because, task template replacement
// may be expensive, as we may offload
if taskTemplateRegex.MatchString(val) {
p, err := params.Task.Path(ctx)
if err != nil {
logger.Debugf(ctx, "Failed to substitute Task Template reference - reason %s", err)
return "", err
}
val = taskTemplateRegex.ReplaceAllString(val, p.String())
}

inputs, err := params.Inputs.Get(ctx)
if err != nil {
return val, errors.Wrapf(err, "unable to read inputs")
}
Expand All @@ -129,6 +133,19 @@ func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, command
return val, nil
}

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}

func serializePrimitive(p *idlCore.Primitive) (string, error) {
switch o := p.Value.(type) {
case *idlCore.Primitive_Integer:
Expand Down
Loading

0 comments on commit f406cbb

Please sign in to comment.