Skip to content

Commit

Permalink
Raw output prefix (flyteorg#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Aug 26, 2020
1 parent dc4e8b5 commit 3ebab24
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestToK8sPodIterruptible(t *testing.T) {

op := &pluginsIOMock.OutputFilePaths{}
op.On("GetOutputPrefixPath").Return(storage.DataReference(""))
op.On("GetRawOutputPrefix").Return(storage.DataReference(""))

x := dummyTaskExecutionMetadata(&v1.ResourceRequirements{
Limits: v1.ResourceList{
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestToK8sPod(t *testing.T) {

op := &pluginsIOMock.OutputFilePaths{}
op.On("GetOutputPrefixPath").Return(storage.DataReference(""))
op.On("GetRawOutputPrefix").Return(storage.DataReference(""))

t.Run("WithGPU", func(t *testing.T) {
x := dummyTaskExecutionMetadata(&v1.ResourceRequirements{
Expand Down
2 changes: 2 additions & 0 deletions go/tasks/pluginmachinery/utils/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var inputFileRegex = regexp.MustCompile(`(?i){{\s*[\.$]Input\s*}}`)
var inputPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]InputPrefix\s*}}`)
var outputRegex = regexp.MustCompile(`(?i){{\s*[\.$]OutputPrefix\s*}}`)
var inputVarRegex = regexp.MustCompile(`(?i){{\s*[\.$]Inputs\.(?P<input_name>[^}\s]+)\s*}}`)
var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPrefix\s*}}`)

// Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
// Supported templates are:
Expand Down Expand Up @@ -68,6 +69,7 @@ func replaceTemplateCommandArgs(ctx context.Context, commandTemplate string, in
val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, out.GetRawOutputPrefix().String())

inputs, err := in.Get(ctx)
if err != nil {
Expand Down
34 changes: 31 additions & 3 deletions go/tasks/pluginmachinery/utils/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ func (d dummyInputReader) Get(ctx context.Context) (*core.LiteralMap, error) {
}

type dummyOutputPaths struct {
outputPath storage.DataReference
outputPath storage.DataReference
rawOutputDataPrefix storage.DataReference
}

func (d dummyOutputPaths) GetRawOutputPrefix() storage.DataReference {
panic("should not be called")
return d.rawOutputDataPrefix
}

func (d dummyOutputPaths) GetOutputPrefixPath() storage.DataReference {
Expand Down Expand Up @@ -96,7 +97,10 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
})

in := dummyInputReader{inputPath: "input/blah"}
out := dummyOutputPaths{outputPath: "output/blah"}
out := dummyOutputPaths{
outputPath: "output/blah",
rawOutputDataPrefix: "s3://custom-bucket",
}

t.Run("nothing to substitute", func(t *testing.T) {
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
Expand Down Expand Up @@ -178,13 +182,15 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
"world",
"${{input}}",
"{{ .OutputPrefix }}",
"--switch {{ .rawOutputDataPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
"${{input}}",
"output/blah",
"--switch s3://custom-bucket",
}, actual)
})

Expand All @@ -205,13 +211,15 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
"world",
`--someArg {{ .Inputs.arr }}`,
"{{ .OutputPrefix }}",
"{{ $RawOutputDataPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
"--someArg [a,b]",
"output/blah",
"s3://custom-bucket",
}, actual)
})

Expand All @@ -226,13 +234,15 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
"world",
`--someArg {{ .Inputs.date }}`,
"{{ .OutputPrefix }}",
"{{ .rawOutputDataPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
"--someArg 1900-01-01T01:01:01.000000001Z",
"output/blah",
"s3://custom-bucket",
}, actual)
})

Expand All @@ -247,13 +257,15 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
"world",
`--someArg {{ .Inputs.arr }}`,
"{{ .OutputPrefix }}",
"{{ .wrongOutputDataPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
"--someArg [[a,b],[1,2]]",
"output/blah",
"{{ .wrongOutputDataPrefix }}",
}, actual)
})

Expand All @@ -265,13 +277,15 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
"world",
`--someArg {{ .Inputs.arr }}`,
"{{ .OutputPrefix }}",
"--raw-data-output-prefix {{ .rawOutputDataPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
`--someArg {{ .Inputs.arr }}`,
"output/blah",
"--raw-data-output-prefix s3://custom-bucket",
}, actual)
})

Expand Down Expand Up @@ -338,4 +352,18 @@ func TestReplaceTemplateCommandArgs(t *testing.T) {
"output/blah",
}, actual)
})

t.Run("sub raw output data prefix", func(t *testing.T) {
actual, err := ReplaceTemplateCommandArgs(context.TODO(), []string{
"hello",
"world",
"{{ .rawOutputDataPrefix }}",
}, in, out)
assert.NoError(t, err)
assert.Equal(t, []string{
"hello",
"world",
"s3://custom-bucket",
}, actual)
})
}
1 change: 1 addition & 0 deletions go/tasks/plugins/array/awsbatch/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestLaunchSubTasks(t *testing.T) {

ow := &mocks3.OutputWriter{}
ow.OnGetOutputPrefixPath().Return("/prefix/")
ow.OnGetRawOutputPrefix().Return("s3://")

ir := &mocks3.InputReader{}
ir.OnGetInputPrefixPath().Return("/prefix/")
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/array/awsbatch/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func TestArrayJobToBatchInput(t *testing.T) {

or := &mocks2.OutputWriter{}
or.OnGetOutputPrefixPath().Return("/path/output")
or.OnGetRawOutputPrefix().Return("s3://")

taskCtx := &mocks.TaskExecutionContext{}
taskCtx.OnTaskExecutionMetadata().Return(tMetadata)
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/k8s/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str
outputReader := &pluginsIOMock.OutputWriter{}
outputReader.On("GetOutputPath").Return(storage.DataReference("/data/outputs.pb"))
outputReader.On("GetOutputPrefixPath").Return(storage.DataReference("/data/"))
outputReader.On("GetRawOutputPrefix").Return(storage.DataReference(""))
taskCtx.On("OutputWriter").Return(outputReader)

taskReader := &pluginsCoreMock.TaskReader{}
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskEx
outputReader := &pluginIOMocks.OutputWriter{}
outputReader.OnGetOutputPath().Return(storage.DataReference("/data/outputs.pb"))
outputReader.OnGetOutputPrefixPath().Return(storage.DataReference("/data/"))
outputReader.OnGetRawOutputPrefix().Return(storage.DataReference(""))
taskCtx.OnOutputWriter().Return(outputReader)

taskReader := &mocks.TaskReader{}
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/k8s/sagemaker/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions go/tasks/plugins/k8s/sagemaker/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/tasks/plugins/k8s/sidecar/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func getDummySidecarTaskContext(taskTemplate *core.TaskTemplate, resources *v1.R
outputReader := &pluginsIOMock.OutputWriter{}
outputReader.On("GetOutputPath").Return(storage.DataReference("/data/outputs.pb"))
outputReader.On("GetOutputPrefixPath").Return(storage.DataReference("/data/"))
outputReader.On("GetRawOutputPrefix").Return(storage.DataReference(""))
taskCtx.On("OutputWriter").Return(outputReader)

taskReader := &pluginsCoreMock.TaskReader{}
Expand Down
2 changes: 2 additions & 0 deletions go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExec
outputReader := &pluginIOMocks.OutputWriter{}
outputReader.On("GetOutputPath").Return(storage.DataReference("/data/outputs.pb"))
outputReader.On("GetOutputPrefixPath").Return(storage.DataReference("/data/"))
outputReader.On("GetRawOutputPrefix").Return(storage.DataReference(""))

taskCtx.On("OutputWriter").Return(outputReader)

taskReader := &mocks.TaskReader{}
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/presto/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func GetMockTaskExecutionContext() core.TaskExecutionContext {
outputReader := &ioMock.OutputWriter{}
outputReader.On("GetOutputPath").Return(storage.DataReference("/data/outputs.pb"))
outputReader.On("GetOutputPrefixPath").Return(storage.DataReference("/data/"))
outputReader.On("GetRawOutputPrefix").Return(storage.DataReference("s3://"))
taskCtx.On("OutputWriter").Return(outputReader)

taskReader := &coreMock.TaskReader{}
Expand Down

0 comments on commit 3ebab24

Please sign in to comment.