Skip to content

Commit

Permalink
Allow multiple arguments to be substituted in one run (flyteorg#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored May 26, 2020
1 parent 37469ac commit c0f3135
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 29 deletions.
19 changes: 19 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/utils/error_collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils

import (
"fmt"
"strings"
)

type ErrorCollection struct {
Errors []error
}

func (e ErrorCollection) Error() string {
sb := strings.Builder{}
for idx, err := range e.Errors {
sb.WriteString(fmt.Sprintf("%v: %v\r\n", idx, err))
}

return sb.String()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package utils

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrorCollection(t *testing.T) {
ec := ErrorCollection{}

assert.Empty(t, ec.Error())

ec.Errors = append(ec.Errors, fmt.Errorf("error1"))
assert.NotEmpty(t, ec.Error())

ec.Errors = append(ec.Errors, fmt.Errorf("error2"))
assert.NotEmpty(t, ec.Error())

assert.Equal(t, "0: error1\r\n1: error2\r\n", ec.Error())
}
58 changes: 30 additions & 28 deletions flyteplugins/go/tasks/pluginmachinery/utils/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,47 +51,49 @@ func ReplaceTemplateCommandArgs(ctx context.Context, command []string, in io.Inp
return res, nil
}

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *core.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}

func replaceTemplateCommandArgs(ctx context.Context, commandTemplate string, in io.InputReader, out io.OutputFilePaths) (string, error) {
val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
groupMatches := inputVarRegex.FindAllStringSubmatchIndex(val, -1)
if len(groupMatches) == 0 {
return val, nil
} else if len(groupMatches) > 1 {
return val, fmt.Errorf("only one level of inputs nesting is supported. Syntax in [%v] is invalid", commandTemplate)
} else if len(groupMatches[0]) > 4 {
return val, fmt.Errorf("longer submatches not supported. Syntax in [%v] is invalid", commandTemplate)
}
startIdx := groupMatches[0][0]
endIdx := groupMatches[0][1]
inputStartIdx := groupMatches[0][2]
inputEndIdx := groupMatches[0][3]
inputName := val[inputStartIdx:inputEndIdx]

inputs, err := in.Get(ctx)
if err != nil {
return val, errors.Wrapf(err, "unable to read inputs for [%s]", inputName)
return val, errors.Wrapf(err, "unable to read inputs")
}
if inputs == nil || inputs.Literals == nil {
return val, fmt.Errorf("no inputs provided, cannot bind input name [%s]", inputName)
}
inputVal, exists := inputs.Literals[inputName]
if !exists {
return val, fmt.Errorf("requested input is not found [%v] while processing template [%v]",
inputName, commandTemplate)
return val, nil
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return val, errors.Wrapf(err, "failed to bind a value to inputName [%s]", inputName)
}
if endIdx >= len(val) {
return val[:startIdx] + v, nil
}
var errs ErrorCollection
val = inputVarRegex.ReplaceAllStringFunc(val, func(s string) string {
matches := inputVarRegex.FindAllStringSubmatch(s, 1)
varName := matches[0][1]
replaced, err := transformVarNameToStringVal(ctx, varName, inputs)
if err != nil {
errs.Errors = append(errs.Errors, errors.Wrapf(err, "input template [%s]", s))
return ""
}
return replaced
})

return val[:startIdx] + v + val[endIdx:], nil
if len(errs.Errors) > 0 {
return "", errs
}

return val, nil
}

func serializePrimitive(p *core.Primitive) (string, error) {
Expand Down
72 changes: 71 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/utils/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,82 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
t.Run("nil input", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{}}

_, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
`--someArg {{ .Inputs.arr }}`,
"{{ .OutputPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
`--someArg {{ .Inputs.arr }}`,
"output/blah",
}, actual)
})

t.Run("multi-input", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"ds": coreutils.MustMakeLiteral(time.Date(1900, 01, 01, 01, 01, 01, 000000001, time.UTC)),
"table": coreutils.MustMakeLiteral("my_table"),
"hr": coreutils.MustMakeLiteral("hr"),
"min": coreutils.MustMakeLiteral(15),
},
}}
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
`SELECT
COUNT(*) as total_count
FROM
hive.events.{{ .Inputs.table }}
WHERE
ds = '{{ .Inputs.ds }}' AND hr = '{{ .Inputs.hr }}' AND min = {{ .Inputs.min }}
`}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
`SELECT
COUNT(*) as total_count
FROM
hive.events.my_table
WHERE
ds = '1900-01-01T01:01:01.000000001Z' AND hr = 'hr' AND min = 15
`}, actual)
})

t.Run("missing input", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"arr": coreutils.MustMakeLiteral([]interface{}{[]interface{}{"a", "b"}, []interface{}{1, 2}}),
},
}}
_, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
`--someArg {{ .Inputs.blah }}`,
"{{ .OutputPrefix }}",
}, in, out)
assert.Error(t, err)
})

t.Run("bad template", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"arr": coreutils.MustMakeLiteral([]interface{}{[]interface{}{"a", "b"}, []interface{}{1, 2}}),
},
}}
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
`--someArg {{ .Inputs.blah blah }}`,
"{{ .OutputPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
`--someArg {{ .Inputs.blah blah }}`,
"output/blah",
}, actual)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ func TestContains(t *testing.T) {

assert.False(t, Contains(nil, "b"))
}

func TestCopyMap(t *testing.T) {
assert.Nil(t, CopyMap(nil))
m := map[string]string{
"l": "v",
}
assert.Equal(t, m, CopyMap(m))
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestLaunchSubTasks(t *testing.T) {
ir := &mocks3.InputReader{}
ir.OnGetInputPrefixPath().Return("/prefix/")
ir.OnGetInputPath().Return("/prefix/inputs.pb")
ir.OnGetMatch(mock.Anything).Return(nil, nil)

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskReader().Return(tr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestArrayJobToBatchInput(t *testing.T) {
ir := &mocks2.InputReader{}
ir.OnGetInputPath().Return("inputs.pb")
ir.OnGetInputPrefixPath().Return("/inputs/prefix")
ir.OnGetMatch(mock.Anything).Return(nil, nil)

or := &mocks2.OutputWriter{}
or.OnGetOutputPrefixPath().Return("/path/output")
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
inputReader := &ioMocks.InputReader{}
inputReader.OnGetInputPrefixPath().Return(basePrefix)
inputReader.OnGetInputPath().Return(basePrefix + "/inputs.pb")
inputReader.OnGetMatch(mock.Anything).Return(inputs, nil)

outputWriter := &ioMocks.OutputWriter{}
outputWriter.OnGetRawOutputPrefix().Return("/sandbox/")
Expand Down

0 comments on commit c0f3135

Please sign in to comment.