Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Allow multiple arguments to be substituted in one run #92

Merged
merged 4 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go/tasks/pluginmachinery/utils/error_collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils
EngHabu marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"strings"
)

type ErrorCollection struct {
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
Errors []error
}

func (e ErrorCollection) Error() string {
sb := strings.Builder{}
for idx, err := range e.Errors {
sb.WriteString(fmt.Sprintf("%v: %v\r\n", idx, err))
}

return sb.String()
}
22 changes: 22 additions & 0 deletions go/tasks/pluginmachinery/utils/error_collection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package utils

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrorCollection(t *testing.T) {
ec := ErrorCollection{}

assert.Empty(t, ec.Error())

ec.Errors = append(ec.Errors, fmt.Errorf("error1"))
assert.NotEmpty(t, ec.Error())

ec.Errors = append(ec.Errors, fmt.Errorf("error2"))
assert.NotEmpty(t, ec.Error())

assert.Equal(t, "0: error1\r\n1: error2\r\n", ec.Error())
}
58 changes: 30 additions & 28 deletions go/tasks/pluginmachinery/utils/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,47 +51,49 @@ func ReplaceTemplateCommandArgs(ctx context.Context, command []string, in io.Inp
return res, nil
}

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *core.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return "", errors.Wrapf(err, "failed to bind a value to inputName [%s]", varName)
}
return v, nil
}

func replaceTemplateCommandArgs(ctx context.Context, commandTemplate string, in io.InputReader, out io.OutputFilePaths) (string, error) {
val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
groupMatches := inputVarRegex.FindAllStringSubmatchIndex(val, -1)
if len(groupMatches) == 0 {
return val, nil
} else if len(groupMatches) > 1 {
return val, fmt.Errorf("only one level of inputs nesting is supported. Syntax in [%v] is invalid", commandTemplate)
} else if len(groupMatches[0]) > 4 {
return val, fmt.Errorf("longer submatches not supported. Syntax in [%v] is invalid", commandTemplate)
}
startIdx := groupMatches[0][0]
endIdx := groupMatches[0][1]
inputStartIdx := groupMatches[0][2]
inputEndIdx := groupMatches[0][3]
inputName := val[inputStartIdx:inputEndIdx]

inputs, err := in.Get(ctx)
if err != nil {
return val, errors.Wrapf(err, "unable to read inputs for [%s]", inputName)
return val, errors.Wrapf(err, "unable to read inputs")
}
if inputs == nil || inputs.Literals == nil {
return val, fmt.Errorf("no inputs provided, cannot bind input name [%s]", inputName)
}
inputVal, exists := inputs.Literals[inputName]
if !exists {
return val, fmt.Errorf("requested input is not found [%v] while processing template [%v]",
inputName, commandTemplate)
return val, nil
}

v, err := serializeLiteral(ctx, inputVal)
if err != nil {
return val, errors.Wrapf(err, "failed to bind a value to inputName [%s]", inputName)
}
if endIdx >= len(val) {
return val[:startIdx] + v, nil
}
var errs ErrorCollection
val = inputVarRegex.ReplaceAllStringFunc(val, func(s string) string {
matches := inputVarRegex.FindAllStringSubmatch(s, 1)
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
varName := matches[0][1]
replaced, err := transformVarNameToStringVal(ctx, varName, inputs)
if err != nil {
errs.Errors = append(errs.Errors, errors.Wrapf(err, "input template [%s]", s))
return ""
}
return replaced
})

return val[:startIdx] + v + val[endIdx:], nil
if len(errs.Errors) > 0 {
return "", errs
}

return val, nil
}

func serializePrimitive(p *core.Primitive) (string, error) {
Expand Down
72 changes: 71 additions & 1 deletion go/tasks/pluginmachinery/utils/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,82 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
t.Run("nil input", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{}}

_, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
`--someArg {{ .Inputs.arr }}`,
"{{ .OutputPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
`--someArg {{ .Inputs.arr }}`,
"output/blah",
}, actual)
})

t.Run("multi-input", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"ds": coreutils.MustMakeLiteral(time.Date(1900, 01, 01, 01, 01, 01, 000000001, time.UTC)),
"table": coreutils.MustMakeLiteral("my_table"),
"hr": coreutils.MustMakeLiteral("hr"),
"min": coreutils.MustMakeLiteral(15),
},
}}
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
`SELECT
COUNT(*) as total_count
FROM
hive.events.{{ .Inputs.table }}
WHERE
ds = '{{ .Inputs.ds }}' AND hr = '{{ .Inputs.hr }}' AND min = {{ .Inputs.min }}
`}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
`SELECT
COUNT(*) as total_count
FROM
hive.events.my_table
WHERE
ds = '1900-01-01T01:01:01.000000001Z' AND hr = 'hr' AND min = 15
`}, actual)
})

t.Run("missing input", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"arr": coreutils.MustMakeLiteral([]interface{}{[]interface{}{"a", "b"}, []interface{}{1, 2}}),
},
}}
_, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
`--someArg {{ .Inputs.blah }}`,
"{{ .OutputPrefix }}",
}, in, out)
assert.Error(t, err)
})

t.Run("bad template", func(t *testing.T) {
in := dummyInputReader{inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"arr": coreutils.MustMakeLiteral([]interface{}{[]interface{}{"a", "b"}, []interface{}{1, 2}}),
},
}}
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
`--someArg {{ .Inputs.blah blah }}`,
"{{ .OutputPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
`--someArg {{ .Inputs.blah blah }}`,
"output/blah",
}, actual)
})
}
8 changes: 8 additions & 0 deletions go/tasks/pluginmachinery/utils/transformers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ func TestContains(t *testing.T) {

assert.False(t, Contains(nil, "b"))
}

func TestCopyMap(t *testing.T) {
assert.Nil(t, CopyMap(nil))
m := map[string]string{
"l": "v",
}
assert.Equal(t, m, CopyMap(m))
}
1 change: 1 addition & 0 deletions go/tasks/plugins/array/awsbatch/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestLaunchSubTasks(t *testing.T) {
ir := &mocks3.InputReader{}
ir.OnGetInputPrefixPath().Return("/prefix/")
ir.OnGetInputPath().Return("/prefix/inputs.pb")
ir.OnGetMatch(mock.Anything).Return(nil, nil)

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskReader().Return(tr)
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/array/awsbatch/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestArrayJobToBatchInput(t *testing.T) {
ir := &mocks2.InputReader{}
ir.OnGetInputPath().Return("inputs.pb")
ir.OnGetInputPrefixPath().Return("/inputs/prefix")
ir.OnGetMatch(mock.Anything).Return(nil, nil)

or := &mocks2.OutputWriter{}
or.OnGetOutputPrefixPath().Return("/path/output")
Expand Down
1 change: 1 addition & 0 deletions tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
inputReader := &ioMocks.InputReader{}
inputReader.OnGetInputPrefixPath().Return(basePrefix)
inputReader.OnGetInputPath().Return(basePrefix + "/inputs.pb")
inputReader.OnGetMatch(mock.Anything).Return(inputs, nil)

outputWriter := &ioMocks.OutputWriter{}
outputWriter.OnGetRawOutputPrefix().Return("/sandbox/")
Expand Down