From 6ba859abb0ed02ab4d1779810929b3f00f48f716 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 3 Mar 2020 22:52:40 -0800 Subject: [PATCH 01/15] Work in progress --- go/tasks/pluginmachinery/io/iface.go | 14 +++- .../ioutils/buckets_datasharder.go | 64 +++++++++++++++++++ .../pluginmachinery/ioutils/data_sharder.go | 10 +++ .../pluginmachinery/ioutils/output_sandbox.go | 39 +++++++++++ .../ioutils/output_sandbox_test.go | 13 ++++ 5 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 go/tasks/pluginmachinery/ioutils/buckets_datasharder.go create mode 100644 go/tasks/pluginmachinery/ioutils/data_sharder.go create mode 100644 go/tasks/pluginmachinery/ioutils/output_sandbox.go create mode 100644 go/tasks/pluginmachinery/ioutils/output_sandbox_test.go diff --git a/go/tasks/pluginmachinery/io/iface.go b/go/tasks/pluginmachinery/io/iface.go index b64c5ba45..7d87e9fe3 100644 --- a/go/tasks/pluginmachinery/io/iface.go +++ b/go/tasks/pluginmachinery/io/iface.go @@ -41,7 +41,17 @@ type OutputReader interface { Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error) } -// All paths where various outputs produced by the task can be placed, such that the framework can directly access them. +// Data sandbox is the actual path where the data produced by a task can be placed. It is completely optional. The advantage +// of using this path is to provide exactly once semantics. It is guaranteed that this path is unique for every new execution +// of a task (across retries etc) and is constant for a specific execution. +// As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to +// FlytePluginMachinery so that it can be used more universally outside of Flytekit. +type OutputDataSandbox interface { + // This is prefix (blob store prefix or directory) where all data produced can be stored. + GetOutputDataSandboxPath() storage.DataReference +} + +// All paths where various meta outputs produced by the task can be placed, such that the framework can directly access them. // All paths are reperesented using storage.DataReference -> an URN for the configured storage backend type OutputFilePaths interface { // A path to a directory or prefix that contains all execution metadata for this execution @@ -51,6 +61,8 @@ type OutputFilePaths interface { // A Fully qualified path (URN) where the error information should be placed as a protobuf core.ErrorDocument. It is not directly // used by the framework, but could be used in the future GetErrorPath() storage.DataReference + + OutputDataSandbox } // Framework Output writing interface. diff --git a/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go b/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go new file mode 100644 index 000000000..6f2a4f785 --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go @@ -0,0 +1,64 @@ +package ioutils + +import ( + "context" + "hash/fnv" + "strings" + + "github.com/pkg/errors" +) + +func GenerateAlphabet(b []rune) []rune { + for i := 'a'; i <= 'z'; i++ { + b = append(b, i) + } + return b +} + +func GenerateArabicNumerals(b []rune) []rune { + for i := '0'; i <= '9'; i++ { + b = append(b, i) + } + return b +} + +func createAlphabetAndNumerals() []rune { + b := make([]rune, 0, 36) + b = GenerateAlphabet(b) + return GenerateArabicNumerals(b) +} + +// this sharder distributes data into one of the precomputed shards. The precomputed shards in this specific case +// are of the format {[0-9a-z][0-9a-z]} 2 character long. The bucket is deterministically determined given the input s +type BucketsDataSharder struct { + precomputedPrefixes []string + buckets uint32 +} + +func (d *BucketsDataSharder) Initialize(ctx context.Context) error { + permittedChars := createAlphabetAndNumerals() + n := len(permittedChars) + precomputedPrefixes := make([]string, 0, n*n) + for _, c1 := range permittedChars { + for _, c2 := range permittedChars { + sb := strings.Builder{} + sb.WriteRune(c1) + sb.WriteRune(c2) + precomputedPrefixes = append(precomputedPrefixes, sb.String()) + } + } + d.precomputedPrefixes = precomputedPrefixes + d.buckets = uint32(n * n) + return nil +} + +// Generates deterministic shard id for the given string s +func (d *BucketsDataSharder) GetShardPrefix(ctx context.Context, s []byte) (string, error) { + h := fnv.New32a() + _, err := h.Write(s) + if err != nil { + return "", errors.Wrap(err, "failed to create shard prefix, reason hash failure.") + } + idx := h.Sum32() % d.buckets + return d.precomputedPrefixes[idx], nil +} diff --git a/go/tasks/pluginmachinery/ioutils/data_sharder.go b/go/tasks/pluginmachinery/ioutils/data_sharder.go new file mode 100644 index 000000000..030682bf9 --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/data_sharder.go @@ -0,0 +1,10 @@ +package ioutils + +import "context" + +// This interface allows shard selection for OutputSandbox. The API required the Initialize method be invoked before +// invoking the Shard string +type ShardSelector interface { + Initialize(ctx context.Context) error + GetShardPrefix(ctx context.Context, s []byte) (string, error) +} diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/output_sandbox.go new file mode 100644 index 000000000..ef343a79e --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox.go @@ -0,0 +1,39 @@ +package ioutils + +import ( + "context" + "crypto/md5" + + "github.com/lyft/flytestdlib/storage" + + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +) + +type randomPrefixShardedOutputSandbox struct { + path storage.DataReference +} + +func (r randomPrefixShardedOutputSandbox) GetOutputDataSandboxPath() storage.DataReference { + return r.path +} + +// Creates a deterministic OutputSandbox whose path is distributed based on the ShardSelector passed in. +// Determinism depends on the outputMetadataPath +// Potential performance problem, as creating anew randomprefixShardedOutput Sandbox may be expensive as it hashes the outputMetadataPath +// the final OutputSandbox is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata +func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelector, basePath, outputMetadataPath storage.DataReference, store storage.ReferenceConstructor) (io.OutputDataSandbox, error) { + o := []byte(outputMetadataPath) + prefix, err := sharder.GetShardPrefix(ctx, o) + if err != nil { + return nil, err + } + m := md5.New() + m.Write(o) + path, err := store.ConstructReference(ctx, basePath, prefix, string(m.Sum(nil))) + if err != nil { + return nil, err + } + return randomPrefixShardedOutputSandbox{ + path: path, + }, nil +} diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go new file mode 100644 index 000000000..3fb715b85 --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go @@ -0,0 +1,13 @@ +package ioutils + +import ( + "testing" +) + +func TestNewDataSharder(t *testing.T) { + b := make([]rune, 0, 26) + for i := 'a'; i <= 'z'; i++ { + b = append(b, i) + } + print(b) +} From 43ab99029e6f1ba10a1963ee90fb14a235c465a1 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 3 Mar 2020 23:00:44 -0800 Subject: [PATCH 02/15] work in progress --- .../io/mocks/output_data_sandbox.go | 45 +++++++++++++++++++ .../io/mocks/output_file_paths.go | 32 +++++++++++++ .../pluginmachinery/io/mocks/output_writer.go | 32 +++++++++++++ .../ioutils/buckets_datasharder.go | 2 +- .../ioutils/remote_file_output_writer.go | 4 +- go/tasks/plugins/array/catalog.go | 3 +- 6 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go diff --git a/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go b/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go new file mode 100644 index 000000000..727fadc32 --- /dev/null +++ b/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go @@ -0,0 +1,45 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + storage "github.com/lyft/flytestdlib/storage" + mock "github.com/stretchr/testify/mock" +) + +// OutputDataSandbox is an autogenerated mock type for the OutputDataSandbox type +type OutputDataSandbox struct { + mock.Mock +} + +type OutputDataSandbox_GetOutputDataSandboxPath struct { + *mock.Call +} + +func (_m OutputDataSandbox_GetOutputDataSandboxPath) Return(_a0 storage.DataReference) *OutputDataSandbox_GetOutputDataSandboxPath { + return &OutputDataSandbox_GetOutputDataSandboxPath{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputDataSandbox) OnGetOutputDataSandboxPath() *OutputDataSandbox_GetOutputDataSandboxPath { + c := _m.On("GetOutputDataSandboxPath") + return &OutputDataSandbox_GetOutputDataSandboxPath{Call: c} +} + +func (_m *OutputDataSandbox) OnGetOutputDataSandboxPathMatch(matchers ...interface{}) *OutputDataSandbox_GetOutputDataSandboxPath { + c := _m.On("GetOutputDataSandboxPath", matchers...) + return &OutputDataSandbox_GetOutputDataSandboxPath{Call: c} +} + +// GetOutputDataSandboxPath provides a mock function with given fields: +func (_m *OutputDataSandbox) GetOutputDataSandboxPath() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} diff --git a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go index bd65a3305..f56e86c03 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go +++ b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go @@ -44,6 +44,38 @@ func (_m *OutputFilePaths) GetErrorPath() storage.DataReference { return r0 } +type OutputFilePaths_GetOutputDataSandboxPath struct { + *mock.Call +} + +func (_m OutputFilePaths_GetOutputDataSandboxPath) Return(_a0 storage.DataReference) *OutputFilePaths_GetOutputDataSandboxPath { + return &OutputFilePaths_GetOutputDataSandboxPath{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputFilePaths) OnGetOutputDataSandboxPath() *OutputFilePaths_GetOutputDataSandboxPath { + c := _m.On("GetOutputDataSandboxPath") + return &OutputFilePaths_GetOutputDataSandboxPath{Call: c} +} + +func (_m *OutputFilePaths) OnGetOutputDataSandboxPathMatch(matchers ...interface{}) *OutputFilePaths_GetOutputDataSandboxPath { + c := _m.On("GetOutputDataSandboxPath", matchers...) + return &OutputFilePaths_GetOutputDataSandboxPath{Call: c} +} + +// GetOutputDataSandboxPath provides a mock function with given fields: +func (_m *OutputFilePaths) GetOutputDataSandboxPath() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type OutputFilePaths_GetOutputPath struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/io/mocks/output_writer.go b/go/tasks/pluginmachinery/io/mocks/output_writer.go index 0414271d0..cfc38957c 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_writer.go +++ b/go/tasks/pluginmachinery/io/mocks/output_writer.go @@ -48,6 +48,38 @@ func (_m *OutputWriter) GetErrorPath() storage.DataReference { return r0 } +type OutputWriter_GetOutputDataSandboxPath struct { + *mock.Call +} + +func (_m OutputWriter_GetOutputDataSandboxPath) Return(_a0 storage.DataReference) *OutputWriter_GetOutputDataSandboxPath { + return &OutputWriter_GetOutputDataSandboxPath{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputWriter) OnGetOutputDataSandboxPath() *OutputWriter_GetOutputDataSandboxPath { + c := _m.On("GetOutputDataSandboxPath") + return &OutputWriter_GetOutputDataSandboxPath{Call: c} +} + +func (_m *OutputWriter) OnGetOutputDataSandboxPathMatch(matchers ...interface{}) *OutputWriter_GetOutputDataSandboxPath { + c := _m.On("GetOutputDataSandboxPath", matchers...) + return &OutputWriter_GetOutputDataSandboxPath{Call: c} +} + +// GetOutputDataSandboxPath provides a mock function with given fields: +func (_m *OutputWriter) GetOutputDataSandboxPath() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type OutputWriter_GetOutputPath struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go b/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go index 6f2a4f785..065fb29a4 100644 --- a/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go +++ b/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go @@ -28,7 +28,7 @@ func createAlphabetAndNumerals() []rune { return GenerateArabicNumerals(b) } -// this sharder distributes data into one of the precomputed shards. The precomputed shards in this specific case +// this sharder distributes data into one of the precomputed buckets. The precomputed shards in this specific case // are of the format {[0-9a-z][0-9a-z]} 2 character long. The bucket is deterministically determined given the input s type BucketsDataSharder struct { precomputedPrefixes []string diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go index 65277ba3b..c82202a24 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go @@ -18,6 +18,7 @@ type RemoteFileOutputWriter struct { type RemoteFileOutputPaths struct { outputPrefix storage.DataReference store storage.ReferenceConstructor + io.OutputDataSandbox } var ( @@ -71,10 +72,11 @@ func (w RemoteFileOutputWriter) Put(ctx context.Context, reader io.OutputReader) return fmt.Errorf("no data found to write") } -func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference) RemoteFileOutputPaths { +func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference, sandbox io.OutputDataSandbox) RemoteFileOutputPaths { return RemoteFileOutputPaths{ store: store, outputPrefix: outputPrefix, + OutputDataSandbox: sandbox, } } diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 1161fa306..9bd7ac1e5 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -396,7 +396,8 @@ func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, ou return nil, err } - return ioutils.NewRemoteFileOutputWriter(ctx, dataStore, ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference)), nil + p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRandomPrefixShardedOutputSandbox(ctx, ioutils.)) + return ioutils.NewRemoteFileOutputWriter(ctx, dataStore, p), nil } func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, outputPrefix storage.DataReference, From dd6996dd05c561cb74c860c329373b9f5dece158 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 4 Mar 2020 20:37:40 -0800 Subject: [PATCH 03/15] work in progress --- .../pluginmachinery/ioutils/data_sharder.go | 4 +- .../pluginmachinery/ioutils/output_sandbox.go | 14 +++++-- .../ioutils/output_sandbox_test.go | 29 +++++++++++--- ...harder.go => precomputed_shardselector.go} | 39 ++++++++++++------- go/tasks/plugins/array/catalog.go | 33 ++++++++++------ 5 files changed, 79 insertions(+), 40 deletions(-) rename go/tasks/pluginmachinery/ioutils/{buckets_datasharder.go => precomputed_shardselector.go} (61%) diff --git a/go/tasks/pluginmachinery/ioutils/data_sharder.go b/go/tasks/pluginmachinery/ioutils/data_sharder.go index 030682bf9..ae467afb0 100644 --- a/go/tasks/pluginmachinery/ioutils/data_sharder.go +++ b/go/tasks/pluginmachinery/ioutils/data_sharder.go @@ -2,9 +2,7 @@ package ioutils import "context" -// This interface allows shard selection for OutputSandbox. The API required the Initialize method be invoked before -// invoking the Shard string +// This interface allows shard selection for OutputSandbox. type ShardSelector interface { - Initialize(ctx context.Context) error GetShardPrefix(ctx context.Context, s []byte) (string, error) } diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/output_sandbox.go index ef343a79e..4564a7b45 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox.go @@ -3,17 +3,18 @@ package ioutils import ( "context" "crypto/md5" + "encoding/hex" "github.com/lyft/flytestdlib/storage" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" ) -type randomPrefixShardedOutputSandbox struct { +type precomputedOutputSandbox struct { path storage.DataReference } -func (r randomPrefixShardedOutputSandbox) GetOutputDataSandboxPath() storage.DataReference { +func (r precomputedOutputSandbox) GetOutputDataSandboxPath() storage.DataReference { return r.path } @@ -29,11 +30,16 @@ func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelec } m := md5.New() m.Write(o) - path, err := store.ConstructReference(ctx, basePath, prefix, string(m.Sum(nil))) + path, err := store.ConstructReference(ctx, basePath, prefix, hex.EncodeToString(m.Sum(nil))) if err != nil { return nil, err } - return randomPrefixShardedOutputSandbox{ + return precomputedOutputSandbox{ path: path, }, nil } + +// A simple Output sandbox at a given path +func NewOutputSandbox(ctx context.Context, outputSandboxPath storage.DataReference) io.OutputDataSandbox { + return precomputedOutputSandbox{path: outputSandboxPath} +} diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go index 3fb715b85..2a2e9af42 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go @@ -1,13 +1,30 @@ package ioutils import ( + "context" "testing" + + "github.com/lyft/flytestdlib/storage" + "github.com/stretchr/testify/assert" ) -func TestNewDataSharder(t *testing.T) { - b := make([]rune, 0, 26) - for i := 'a'; i <= 'z'; i++ { - b = append(b, i) - } - print(b) +func TestNewOutputSandbox(t *testing.T) { + assert.Equal(t, NewOutputSandbox(context.TODO(), "x").GetOutputDataSandboxPath(), storage.DataReference("x")) +} + +func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { + ctx := context.TODO() + + t.Run("success-path", func(t *testing.T) { + ss := NewConstantShardSelector([]string{"x"}) + sd, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, storage.DataReference("s3://bucket/x/6f8f57715090da2632453988d9a1501b"), sd.GetOutputDataSandboxPath()) + }) + + t.Run("error", func(t *testing.T) { + ss := NewConstantShardSelector([]string{"x"}) + _, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "#/ ", "m", storage.URLPathConstructor{}) + assert.Error(t, err) + }) } diff --git a/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go similarity index 61% rename from go/tasks/pluginmachinery/ioutils/buckets_datasharder.go rename to go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go index 065fb29a4..060743e44 100644 --- a/go/tasks/pluginmachinery/ioutils/buckets_datasharder.go +++ b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go @@ -28,14 +28,25 @@ func createAlphabetAndNumerals() []rune { return GenerateArabicNumerals(b) } -// this sharder distributes data into one of the precomputed buckets. The precomputed shards in this specific case -// are of the format {[0-9a-z][0-9a-z]} 2 character long. The bucket is deterministically determined given the input s -type BucketsDataSharder struct { +// this sharder distributes data into one of the precomputed buckets. The bucket is deterministically determined given the input s +type PrecomputedShardSelector struct { precomputedPrefixes []string buckets uint32 } -func (d *BucketsDataSharder) Initialize(ctx context.Context) error { +// Generates deterministic shard id for the given string s +func (d *PrecomputedShardSelector) GetShardPrefix(ctx context.Context, s []byte) (string, error) { + h := fnv.New32a() + _, err := h.Write(s) + if err != nil { + return "", errors.Wrap(err, "failed to create shard prefix, reason hash failure.") + } + idx := h.Sum32() % d.buckets + return d.precomputedPrefixes[idx], nil +} + +// Creates a PrecomputedShardSelector with 36*36 unique shards. Each shard is of the format {[0-9a-z][0-9a-z]}, i.e. 2 character long. +func NewBase36PrefixShardSelector(ctx context.Context) (ShardSelector, error) { permittedChars := createAlphabetAndNumerals() n := len(permittedChars) precomputedPrefixes := make([]string, 0, n*n) @@ -47,18 +58,16 @@ func (d *BucketsDataSharder) Initialize(ctx context.Context) error { precomputedPrefixes = append(precomputedPrefixes, sb.String()) } } - d.precomputedPrefixes = precomputedPrefixes - d.buckets = uint32(n * n) - return nil + + return &PrecomputedShardSelector{ + precomputedPrefixes: precomputedPrefixes, + buckets: uint32(n * n), + }, nil } -// Generates deterministic shard id for the given string s -func (d *BucketsDataSharder) GetShardPrefix(ctx context.Context, s []byte) (string, error) { - h := fnv.New32a() - _, err := h.Write(s) - if err != nil { - return "", errors.Wrap(err, "failed to create shard prefix, reason hash failure.") +func NewConstantShardSelector(shards []string) ShardSelector { + return &PrecomputedShardSelector{ + precomputedPrefixes: shards, + buckets: uint32(len(shards)), } - idx := h.Sum32() % d.buckets - return d.precomputedPrefixes[idx], nil } diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 9bd7ac1e5..3bbca7367 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -7,14 +7,15 @@ import ( arrayCore "github.com/lyft/flyteplugins/go/tasks/plugins/array/core" + "github.com/lyft/flytestdlib/bitarray" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flytestdlib/storage" + "github.com/lyft/flyteplugins/go/tasks/errors" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/lyft/flytestdlib/bitarray" - "github.com/lyft/flytestdlib/logger" - "github.com/lyft/flytestdlib/storage" idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" ) @@ -63,7 +64,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } // build output writers - outputWriters, err := ConstructOutputWriters(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), int(arrayJob.Size)) + outputWriters, err := ConstructOutputWriters(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), int(arrayJob.Size)) if err != nil { return state, err } @@ -372,13 +373,17 @@ func ConstructInputReaders(ctx context.Context, dataStore *storage.DataStore, in return inputReaders, nil } -func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, outputPrefix storage.DataReference, +func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, size int) ([]io.OutputWriter, error) { outputWriters := make([]io.OutputWriter, 0, size) for i := 0; i < size; i++ { - ow, err := ConstructOutputWriter(ctx, dataStore, outputPrefix, i) + outputSandbox, err := dataStore.ConstructReference(ctx, baseOutputSandbox, strconv.Itoa(i)) + if err != nil { + return nil, err + } + ow, err := ConstructOutputWriter(ctx, dataStore, outputPrefix, outputSandbox, i) if err != nil { return outputWriters, err } @@ -389,24 +394,28 @@ func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, o return outputWriters, nil } -func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, outputPrefix storage.DataReference, +func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, outputPrefix, outputSandbox storage.DataReference, index int) (io.OutputWriter, error) { dataReference, err := dataStore.ConstructReference(ctx, outputPrefix, strconv.Itoa(index)) if err != nil { return nil, err } - p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRandomPrefixShardedOutputSandbox(ctx, ioutils.)) + p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewOutputSandbox(ctx, outputSandbox)) return ioutils.NewRemoteFileOutputWriter(ctx, dataStore, p), nil } -func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, outputPrefix storage.DataReference, +func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, size int) ([]io.OutputReader, error) { outputReaders := make([]io.OutputReader, 0, size) for i := 0; i < size; i++ { - reader, err := ConstructOutputReader(ctx, dataStore, outputPrefix, i) + outputSandbox, err := dataStore.ConstructReference(ctx, baseOutputSandbox, strconv.Itoa(i)) + if err != nil { + return nil, err + } + reader, err := ConstructOutputReader(ctx, dataStore, outputPrefix, outputSandbox, i) if err != nil { return nil, err } @@ -417,13 +426,13 @@ func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, o return outputReaders, nil } -func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix storage.DataReference, +func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix, outputSandbox storage.DataReference, index int) (io.OutputReader, error) { dataReference, err := dataStore.ConstructReference(ctx, outputPrefix, strconv.Itoa(index)) if err != nil { return nil, err } - outputPath := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference) + outputPath := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewOutputSandbox(ctx, outputSandbox)) return ioutils.NewRemoteFileOutputReader(ctx, dataStore, outputPath, int64(999999999)), nil } From 4dd43b148f8459c025a3143c107c99f3ac2617ca Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Thu, 5 Mar 2020 16:20:53 -0800 Subject: [PATCH 04/15] updated tests --- go/tasks/pluginmachinery/ioutils/output_sandbox.go | 2 +- go/tasks/pluginmachinery/ioutils/output_sandbox_test.go | 6 +++--- .../pluginmachinery/ioutils/precomputed_shardselector.go | 6 ++---- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/output_sandbox.go index 4564a7b45..e42f63799 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox.go @@ -40,6 +40,6 @@ func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelec } // A simple Output sandbox at a given path -func NewOutputSandbox(ctx context.Context, outputSandboxPath storage.DataReference) io.OutputDataSandbox { +func NewOutputSandbox(_ context.Context, outputSandboxPath storage.DataReference) io.OutputDataSandbox { return precomputedOutputSandbox{path: outputSandboxPath} } diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go index 2a2e9af42..b255a168b 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go @@ -23,8 +23,8 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { }) t.Run("error", func(t *testing.T) { - ss := NewConstantShardSelector([]string{"x"}) - _, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "#/ ", "m", storage.URLPathConstructor{}) - assert.Error(t, err) + ss := NewConstantShardSelector([]string{"s3:// abc"}) + sd, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) + assert.Error(t, err, "%s", sd) }) } diff --git a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go index 060743e44..3a752a65e 100644 --- a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go +++ b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go @@ -59,12 +59,10 @@ func NewBase36PrefixShardSelector(ctx context.Context) (ShardSelector, error) { } } - return &PrecomputedShardSelector{ - precomputedPrefixes: precomputedPrefixes, - buckets: uint32(n * n), - }, nil + return NewConstantShardSelector(precomputedPrefixes), nil } +// uses the given shards to select a shard func NewConstantShardSelector(shards []string) ShardSelector { return &PrecomputedShardSelector{ precomputedPrefixes: shards, From 5a3cb447a13b21e67b34c32a9fe24aa6e3fb8955 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 6 Mar 2020 17:48:30 -0800 Subject: [PATCH 05/15] unit testing in progress --- go/tasks/pluginmachinery/utils/template_test.go | 4 ++++ go/tasks/plugins/array/catalog.go | 2 +- go/tasks/plugins/array/outputs.go | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/go/tasks/pluginmachinery/utils/template_test.go b/go/tasks/pluginmachinery/utils/template_test.go index cf5752076..97e812418 100755 --- a/go/tasks/pluginmachinery/utils/template_test.go +++ b/go/tasks/pluginmachinery/utils/template_test.go @@ -45,6 +45,10 @@ type dummyOutputPaths struct { outputPath storage.DataReference } +func (d dummyOutputPaths) GetOutputDataSandboxPath() storage.DataReference { + panic("should not be called") +} + func (d dummyOutputPaths) GetOutputPrefixPath() storage.DataReference { return d.outputPath } diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 3bbca7367..49d235799 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -164,7 +164,7 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state } // output reader - outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), int(arrayJob.Size)) + outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), int(arrayJob.Size)) if err != nil { return nil, err } diff --git a/go/tasks/plugins/array/outputs.go b/go/tasks/plugins/array/outputs.go index 052a10e26..55d588d6f 100644 --- a/go/tasks/plugins/array/outputs.go +++ b/go/tasks/plugins/array/outputs.go @@ -56,7 +56,7 @@ type assembleOutputsWorker struct { func (w assembleOutputsWorker) Process(ctx context.Context, workItem workqueue.WorkItem) (workqueue.WorkStatus, error) { i := workItem.(*outputAssembleItem) - outputReaders, err := ConstructOutputReaders(ctx, i.dataStore, i.outputPaths.GetOutputPrefixPath(), int(i.finalPhases.ItemsCount)) + outputReaders, err := ConstructOutputReaders(ctx, i.dataStore, i.outputPaths.GetOutputPrefixPath(), i.outputPaths.GetOutputDataSandboxPath(), int(i.finalPhases.ItemsCount)) if err != nil { logger.Warnf(ctx, "Failed to construct output readers. Error: %v", err) return workqueue.WorkStatusFailed, err @@ -256,7 +256,7 @@ type assembleErrorsWorker struct { func (a assembleErrorsWorker) Process(ctx context.Context, workItem workqueue.WorkItem) (workqueue.WorkStatus, error) { w := workItem.(*outputAssembleItem) - outputReaders, err := ConstructOutputReaders(ctx, w.dataStore, w.outputPaths.GetOutputPrefixPath(), int(w.finalPhases.ItemsCount)) + outputReaders, err := ConstructOutputReaders(ctx, w.dataStore, w.outputPaths.GetOutputPrefixPath(), w.outputPaths.GetOutputDataSandboxPath(), int(w.finalPhases.ItemsCount)) if err != nil { return workqueue.WorkStatusNotDone, err } From 52992e5787b533c3017e4c6a89da7d0a5ca8826b Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 15:08:05 -0700 Subject: [PATCH 06/15] Unit tests added --- .../ioutils/precomputed_shardselector.go | 2 +- .../ioutils/precomputed_shardselector_test.go | 61 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 go/tasks/pluginmachinery/ioutils/precomputed_shardselector_test.go diff --git a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go index 3a752a65e..3cc0faea7 100644 --- a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go +++ b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go @@ -35,7 +35,7 @@ type PrecomputedShardSelector struct { } // Generates deterministic shard id for the given string s -func (d *PrecomputedShardSelector) GetShardPrefix(ctx context.Context, s []byte) (string, error) { +func (d *PrecomputedShardSelector) GetShardPrefix(_ context.Context, s []byte) (string, error) { h := fnv.New32a() _, err := h.Write(s) if err != nil { diff --git a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector_test.go b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector_test.go new file mode 100644 index 000000000..d988be130 --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector_test.go @@ -0,0 +1,61 @@ +package ioutils + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPrecomputedShardSelector_GetShardPrefix(t *testing.T) { + ctx := context.TODO() + t.Run("single-shard", func(t *testing.T) { + ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x"}, buckets: 1} + p, err := ss.GetShardPrefix(ctx, []byte("abc")) + assert.NoError(t, err) + assert.Equal(t, "x", p) + }) + + t.Run("two-shards", func(t *testing.T) { + ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x", "y"}, buckets: 2} + p, err := ss.GetShardPrefix(ctx, []byte("abc")) + assert.NoError(t, err) + assert.Equal(t, "y", p) + p, err = ss.GetShardPrefix(ctx, []byte("xyz")) + assert.NoError(t, err) + assert.Equal(t, "x", p) + }) +} + +func TestGenerateAlphabet(t *testing.T) { + var b []rune + b = GenerateAlphabet(b) + + assert.Equal(t, 26, len(b)) + assert.Equal(t, 'a', b[0]) + assert.Equal(t, 'z', b[25]) + + // Additive + b = GenerateAlphabet(b) + + assert.Equal(t, 52, len(b)) + assert.Equal(t, 'a', b[26]) + assert.Equal(t, 'z', b[51]) +} + +func TestGenerateArabicNumerals(t *testing.T) { + var b []rune + b = GenerateArabicNumerals(b) + + assert.Equal(t, 10, len(b)) + assert.Equal(t, '0', b[0]) + assert.Equal(t, '9', b[9]) + + // Additive + b = GenerateArabicNumerals(b) + assert.Equal(t, 20, len(b)) + assert.Equal(t, '0', b[0]) + assert.Equal(t, '9', b[9]) + assert.Equal(t, '0', b[10]) + assert.Equal(t, '9', b[19]) +} From f6d8c91a48c7df5a7c5719c6ab5d5d511d4489aa Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 17:10:27 -0700 Subject: [PATCH 07/15] Unit test fixes --- go/tasks/plugins/array/awsbatch/executor.go | 6 ++++-- go/tasks/plugins/array/awsbatch/monitor.go | 11 ++++++----- go/tasks/plugins/array/awsbatch/monitor_test.go | 6 +++--- go/tasks/plugins/array/catalog.go | 16 +++++++++------- go/tasks/plugins/array/catalog_test.go | 14 +++++++++----- go/tasks/plugins/array/k8s/executor.go | 2 +- go/tasks/plugins/array/k8s/monitor.go | 7 ++++--- go/tasks/plugins/array/outputs_test.go | 2 ++ go/tasks/plugins/array/subtask_phase.go | 4 ++-- tests/end_to_end.go | 1 + 10 files changed, 41 insertions(+), 28 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/executor.go b/go/tasks/plugins/array/awsbatch/executor.go index 37455b2a1..0cdd5a854 100644 --- a/go/tasks/plugins/array/awsbatch/executor.go +++ b/go/tasks/plugins/array/awsbatch/executor.go @@ -13,10 +13,11 @@ import ( "github.com/lyft/flytestdlib/logger" - "github.com/lyft/flyteplugins/go/tasks/aws" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/utils" + "github.com/lyft/flyteplugins/go/tasks/aws" + "github.com/lyft/flyteplugins/go/tasks/errors" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" ) @@ -76,7 +77,8 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c pluginState, err = LaunchSubTasks(ctx, tCtx, e.jobStore, pluginConfig, pluginState) case arrayCore.PhaseCheckingSubTaskExecutions: - pluginState, err = CheckSubTasksState(ctx, tCtx.TaskExecutionMetadata(), tCtx.OutputWriter().GetOutputPrefixPath(), + pluginState, err = CheckSubTasksState(ctx, tCtx.TaskExecutionMetadata(), + tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), e.jobStore, tCtx.DataStore(), pluginConfig, pluginState) case arrayCore.PhaseAssembleFinalOutput: diff --git a/go/tasks/plugins/array/awsbatch/monitor.go b/go/tasks/plugins/array/awsbatch/monitor.go index eae1e4993..38a219632 100644 --- a/go/tasks/plugins/array/awsbatch/monitor.go +++ b/go/tasks/plugins/array/awsbatch/monitor.go @@ -4,10 +4,11 @@ import ( "context" core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/storage" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flyteplugins/go/tasks/plugins/array" - "github.com/lyft/flytestdlib/storage" "github.com/lyft/flytestdlib/logger" @@ -33,7 +34,7 @@ func createSubJobList(count int) []*Job { return res } -func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, outputPrefix storage.DataReference, jobStore *JobStore, +func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, outputPrefix, baseOutputSandbox storage.DataReference, jobStore *JobStore, dataStore *storage.DataStore, cfg *config.Config, currentState *State) (newState *State, err error) { newState = currentState @@ -72,7 +73,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata // If the service reported an error but there is no error.pb written, write one with the // service-provided error message. msg.Collect(childIdx, subJob.Status.Message) - or, err := array.ConstructOutputReader(ctx, dataStore, outputPrefix, originalIdx) + or, err := array.ConstructOutputReader(ctx, dataStore, outputPrefix, baseOutputSandbox, originalIdx) if err != nil { return nil, err } @@ -81,7 +82,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata return nil, err } else if !hasErr { // The subtask has not produced an error.pb, write one. - ow, err := array.ConstructOutputWriter(ctx, dataStore, outputPrefix, originalIdx) + ow, err := array.ConstructOutputWriter(ctx, dataStore, outputPrefix, baseOutputSandbox, originalIdx) if err != nil { return nil, err } @@ -101,7 +102,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata msg.Collect(childIdx, "Job failed") } } else if subJob.Status.Phase.IsSuccess() { - actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, childIdx, originalIdx) + actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputSandbox, childIdx, originalIdx) if err != nil { return nil, err } diff --git a/go/tasks/plugins/array/awsbatch/monitor_test.go b/go/tasks/plugins/array/awsbatch/monitor_test.go index 8251c865d..a5ec13119 100644 --- a/go/tasks/plugins/array/awsbatch/monitor_test.go +++ b/go/tasks/plugins/array/awsbatch/monitor_test.go @@ -52,7 +52,7 @@ func TestCheckSubTasksState(t *testing.T) { utils.NewRateLimiter("", 10, 20)) jobStore := newJobsStore(t, batchClient) - newState, err := CheckSubTasksState(ctx, tMeta, "", jobStore, nil, &config.Config{}, &State{ + newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, nil, &config.Config{}, &State{ State: &arrayCore.State{ CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions, ExecutionArraySize: 5, @@ -98,7 +98,7 @@ func TestCheckSubTasksState(t *testing.T) { assert.NoError(t, err) - newState, err := CheckSubTasksState(ctx, tMeta, "", jobStore, nil, &config.Config{}, &State{ + newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, nil, &config.Config{}, &State{ State: &arrayCore.State{ CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions, ExecutionArraySize: 5, @@ -137,7 +137,7 @@ func TestCheckSubTasksState(t *testing.T) { inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) - newState, err := CheckSubTasksState(ctx, tMeta, "", jobStore, inMemDatastore, &config.Config{}, &State{ + newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{ State: &arrayCore.State{ CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions, ExecutionArraySize: 2, diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 80269f69a..e306ba3f9 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -411,11 +411,7 @@ func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, o outputReaders := make([]io.OutputReader, 0, size) for i := 0; i < size; i++ { - outputSandbox, err := dataStore.ConstructReference(ctx, baseOutputSandbox, strconv.Itoa(i)) - if err != nil { - return nil, err - } - reader, err := ConstructOutputReader(ctx, dataStore, outputPrefix, outputSandbox, i) + reader, err := ConstructOutputReader(ctx, dataStore, outputPrefix, baseOutputSandbox, i) if err != nil { return nil, err } @@ -426,9 +422,15 @@ func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, o return outputReaders, nil } -func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix, outputSandbox storage.DataReference, +func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, index int) (io.OutputReader, error) { - dataReference, err := dataStore.ConstructReference(ctx, outputPrefix, strconv.Itoa(index)) + strIndex := strconv.Itoa(index) + dataReference, err := dataStore.ConstructReference(ctx, outputPrefix, strIndex) + if err != nil { + return nil, err + } + + outputSandbox, err := dataStore.ConstructReference(ctx, baseOutputSandbox, strIndex) if err != nil { return nil, err } diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index a3596e381..62fef9756 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -5,22 +5,25 @@ import ( "errors" "testing" - pluginErrors "github.com/lyft/flyteplugins/go/tasks/errors" stdErrors "github.com/lyft/flytestdlib/errors" - core2 "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" + pluginErrors "github.com/lyft/flyteplugins/go/tasks/errors" + "github.com/lyft/flytestdlib/bitarray" + core2 "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - catalogMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + catalogMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" pluginMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core/mocks" @@ -66,6 +69,7 @@ func runDetermineDiscoverabilityTest(t testing.TB, taskTemplate *core.TaskTempla ow := &ioMocks.OutputWriter{} ow.OnGetOutputPrefixPath().Return("/prefix/") + ow.OnGetOutputDataSandboxPath().Return("/sandbox/") ow.OnGetOutputPath().Return("/prefix/outputs.pb") ow.On("Put", mock.Anything, mock.Anything).Return(func(ctx context.Context, or io.OutputReader) error { m, ee, err := or.Read(ctx) diff --git a/go/tasks/plugins/array/k8s/executor.go b/go/tasks/plugins/array/k8s/executor.go index 125ee09a4..13cb7063f 100644 --- a/go/tasks/plugins/array/k8s/executor.go +++ b/go/tasks/plugins/array/k8s/executor.go @@ -81,7 +81,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c case arrayCore.PhaseCheckingSubTaskExecutions: nextState, logLinks, err = CheckSubTasksState(ctx, tCtx, e.kubeClient, tCtx.DataStore(), - tCtx.OutputWriter().GetOutputPrefixPath(), pluginState) + tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), pluginState) case arrayCore.PhaseAssembleFinalOutput: nextState, err = array.AssembleFinalOutputs(ctx, e.outputsAssembler, tCtx, arrayCore.PhaseSuccess, pluginState) diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index 58ccd852c..f406a5307 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -12,9 +12,10 @@ import ( arrayCore "github.com/lyft/flyteplugins/go/tasks/plugins/array/core" + "github.com/lyft/flytestdlib/bitarray" + "github.com/lyft/flyteplugins/go/tasks/plugins/array/arraystatus" "github.com/lyft/flyteplugins/go/tasks/plugins/array/errorcollector" - "github.com/lyft/flytestdlib/bitarray" v1 "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,7 +36,7 @@ const ( ) func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, - dataStore *storage.DataStore, outputPrefix storage.DataReference, currentState *arrayCore.State) ( + dataStore *storage.DataStore, outputPrefix, baseOutputDataSandbox storage.DataReference, currentState *arrayCore.State) ( newState *arrayCore.State, logLinks []*idlCore.TaskLog, err error) { logLinks = make([]*idlCore.TaskLog, 0, 4) @@ -79,7 +80,7 @@ func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kub actualPhase := phaseInfo.Phase() if phaseInfo.Phase().IsSuccess() { originalIdx := arrayCore.CalculateOriginalIndex(childIdx, currentState.GetIndexesToCache()) - actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, childIdx, originalIdx) + actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputDataSandbox, childIdx, originalIdx) if err != nil { return nil, nil, err } diff --git a/go/tasks/plugins/array/outputs_test.go b/go/tasks/plugins/array/outputs_test.go index 3f0f5fb50..2ea1ddb4f 100644 --- a/go/tasks/plugins/array/outputs_test.go +++ b/go/tasks/plugins/array/outputs_test.go @@ -88,6 +88,7 @@ func Test_assembleOutputsWorker_Process(t *testing.T) { ow := &mocks2.OutputWriter{} ow.OnGetOutputPrefixPath().Return("/bucket/prefix") ow.OnGetOutputPath().Return("/bucket/prefix/outputs.pb") + ow.OnGetOutputDataSandboxPath().Return("/bucket/sandbox/") // Setup the input phases that inform outputs worker about which tasks failed/succeeded. phases := arrayCore.NewPhasesCompactArray(4) @@ -361,6 +362,7 @@ func Test_assembleErrorsWorker_Process(t *testing.T) { // Setup the expected data to be written to outputWriter. ow := &mocks2.OutputWriter{} + ow.OnGetOutputDataSandboxPath().Return("/bucket/sandbox/") ow.OnGetOutputPrefixPath().Return("/bucket/prefix") ow.OnGetErrorPath().Return("/bucket/prefix/error.pb") ow.On("Put", mock.Anything, mock.Anything).Return(func(ctx context.Context, reader io.OutputReader) error { diff --git a/go/tasks/plugins/array/subtask_phase.go b/go/tasks/plugins/array/subtask_phase.go index bc4cbf9f5..51dde550a 100644 --- a/go/tasks/plugins/array/subtask_phase.go +++ b/go/tasks/plugins/array/subtask_phase.go @@ -13,8 +13,8 @@ const ( ErrSystem errors.ErrorCode = "SYSTEM_ERROR" ) -func CheckTaskOutput(ctx context.Context, dataStore *storage.DataStore, outputPrefix storage.DataReference, childIdx, originalIdx int) (core.Phase, error) { - or, err := ConstructOutputReader(ctx, dataStore, outputPrefix, originalIdx) +func CheckTaskOutput(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, childIdx, originalIdx int) (core.Phase, error) { + or, err := ConstructOutputReader(ctx, dataStore, outputPrefix, baseOutputSandbox, originalIdx) if err != nil { return core.PhaseUndefined, errors.Wrapf(ErrSystem, err, "Failed to build output reader for sub task [%v] with original index [%v].", childIdx, originalIdx) } diff --git a/tests/end_to_end.go b/tests/end_to_end.go index cd9c64edf..ff16a181c 100644 --- a/tests/end_to_end.go +++ b/tests/end_to_end.go @@ -85,6 +85,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i inputReader.OnGetInputPath().Return(basePrefix + "/inputs.pb") outputWriter := &ioMocks.OutputWriter{} + outputWriter.OnGetOutputDataSandboxPath().Return("/sandbox/") outputWriter.OnGetOutputPrefixPath().Return(basePrefix) outputWriter.OnGetErrorPath().Return(basePrefix + "/error.pb") outputWriter.OnGetOutputPath().Return(basePrefix + "/outputs.pb") From cce3a0006bd2773f1c10473f3ac7f8d6e684905f Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 17:21:19 -0700 Subject: [PATCH 08/15] lint fixes --- go/tasks/pluginmachinery/ioutils/output_sandbox.go | 8 +++++--- .../pluginmachinery/ioutils/remote_file_output_writer.go | 4 ++-- go/tasks/plugins/array/awsbatch/monitor.go | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/output_sandbox.go index e42f63799..1f72e88dd 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox.go @@ -2,7 +2,7 @@ package ioutils import ( "context" - "crypto/md5" + "crypto/sha256" "encoding/hex" "github.com/lyft/flytestdlib/storage" @@ -28,8 +28,10 @@ func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelec if err != nil { return nil, err } - m := md5.New() - m.Write(o) + m := sha256.New() + if _, err := m.Write(o); err != nil { + return nil, err + } path, err := store.ConstructReference(ctx, basePath, prefix, hex.EncodeToString(m.Sum(nil))) if err != nil { return nil, err diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go index c82202a24..12ac920f7 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go @@ -74,8 +74,8 @@ func (w RemoteFileOutputWriter) Put(ctx context.Context, reader io.OutputReader) func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference, sandbox io.OutputDataSandbox) RemoteFileOutputPaths { return RemoteFileOutputPaths{ - store: store, - outputPrefix: outputPrefix, + store: store, + outputPrefix: outputPrefix, OutputDataSandbox: sandbox, } } diff --git a/go/tasks/plugins/array/awsbatch/monitor.go b/go/tasks/plugins/array/awsbatch/monitor.go index 38a219632..c47bad899 100644 --- a/go/tasks/plugins/array/awsbatch/monitor.go +++ b/go/tasks/plugins/array/awsbatch/monitor.go @@ -102,7 +102,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata msg.Collect(childIdx, "Job failed") } } else if subJob.Status.Phase.IsSuccess() { - actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputSandbox, childIdx, originalIdx) + actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputSandbox, childIdx, originalIdx) if err != nil { return nil, err } From c9c9d9ace20c9f6b046be8c818f1a57ff37a69e2 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 21:06:29 -0700 Subject: [PATCH 09/15] updated hasing algorithm --- go/tasks/pluginmachinery/ioutils/output_sandbox.go | 6 ++++-- go/tasks/pluginmachinery/ioutils/output_sandbox_test.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/output_sandbox.go index 1f72e88dd..209ffc744 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox.go @@ -2,7 +2,7 @@ package ioutils import ( "context" - "crypto/sha256" + "crypto/sha1" // #nosec "encoding/hex" "github.com/lyft/flytestdlib/storage" @@ -28,7 +28,9 @@ func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelec if err != nil { return nil, err } - m := sha256.New() + /* #nosec */ + // We use SHA1 for sheer speed instead of no collisions. As because of the shard Prefix + hash is pretty unique :) + m := sha1.New() if _, err := m.Write(o); err != nil { return nil, err } diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go index b255a168b..39496562b 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go @@ -19,7 +19,7 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { ss := NewConstantShardSelector([]string{"x"}) sd, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) assert.NoError(t, err) - assert.Equal(t, storage.DataReference("s3://bucket/x/6f8f57715090da2632453988d9a1501b"), sd.GetOutputDataSandboxPath()) + assert.Equal(t, storage.DataReference("s3://bucket/x/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetOutputDataSandboxPath()) }) t.Run("error", func(t *testing.T) { From 833b62529b0edb0e2705a9fc46f28dccc577bab6 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 21:51:00 -0700 Subject: [PATCH 10/15] updated output sandbox constructor --- .../pluginmachinery/ioutils/output_sandbox.go | 17 +++++++++++++++++ .../ioutils/output_sandbox_test.go | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/output_sandbox.go index 209ffc744..552f61998 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox.go @@ -47,3 +47,20 @@ func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelec func NewOutputSandbox(_ context.Context, outputSandboxPath storage.DataReference) io.OutputDataSandbox { return precomputedOutputSandbox{path: outputSandboxPath} } + +// Creates an OutputSandbox in the basePath using the uniqueID and a sharder +// This implementation is faster than the Randomized strategy +func NewShardedOutputSandbox(ctx context.Context, sharder ShardSelector, basePath storage.DataReference, uniqueID string, store storage.ReferenceConstructor) (io.OutputDataSandbox, error) { + o := []byte(uniqueID) + prefix, err := sharder.GetShardPrefix(ctx, o) + if err != nil { + return nil, err + } + path, err := store.ConstructReference(ctx, basePath, prefix, uniqueID) + if err != nil { + return nil, err + } + return precomputedOutputSandbox{ + path: path, + }, nil +} diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go index 39496562b..d6a41b32a 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go +++ b/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go @@ -28,3 +28,19 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { assert.Error(t, err, "%s", sd) }) } + +func TestNewShardedOutputSandbox(t *testing.T) { + ctx := context.TODO() + t.Run("", func(t *testing.T) { + ss := NewConstantShardSelector([]string{"x"}) + sd, err := NewShardedOutputSandbox(ctx, ss, "s3://flyte", "unique", storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, storage.DataReference("s3://flyte/x/unique"), sd.GetOutputDataSandboxPath()) + }) + + t.Run("error", func(t *testing.T) { + ss := NewConstantShardSelector([]string{"s3:// abc"}) + sd, err := NewShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) + assert.Error(t, err, "%s", sd) + }) +} From ea4280769ab8503fb9ef172a4a1fe21097e5d177 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 17 Mar 2020 18:43:25 -0700 Subject: [PATCH 11/15] Renamed Sandbox -> RawOutputPath --- .../core/mocks/resource_registrar.go | 5 -- go/tasks/pluginmachinery/io/iface.go | 11 ++-- .../io/mocks/output_data_sandbox.go | 10 +-- .../io/mocks/output_file_paths.go | 64 +++++++++---------- .../pluginmachinery/io/mocks/output_writer.go | 64 +++++++++---------- .../io/mocks/raw_output_paths.go | 45 +++++++++++++ .../ioutils/precomputed_shardselector.go | 2 + .../{output_sandbox.go => raw_output_path.go} | 22 +++---- ...andbox_test.go => raw_output_path_test.go} | 14 ++-- .../ioutils/remote_file_output_writer.go | 10 +-- .../pluginmachinery/utils/template_test.go | 2 +- go/tasks/plugins/array/awsbatch/executor.go | 2 +- go/tasks/plugins/array/catalog.go | 8 +-- go/tasks/plugins/array/catalog_test.go | 2 +- go/tasks/plugins/array/k8s/executor.go | 2 +- go/tasks/plugins/array/outputs.go | 4 +- go/tasks/plugins/array/outputs_test.go | 4 +- tests/end_to_end.go | 2 +- 18 files changed, 158 insertions(+), 115 deletions(-) create mode 100644 go/tasks/pluginmachinery/io/mocks/raw_output_paths.go rename go/tasks/pluginmachinery/ioutils/{output_sandbox.go => raw_output_path.go} (59%) rename go/tasks/pluginmachinery/ioutils/{output_sandbox_test.go => raw_output_path_test.go} (64%) diff --git a/go/tasks/pluginmachinery/core/mocks/resource_registrar.go b/go/tasks/pluginmachinery/core/mocks/resource_registrar.go index fdfaf11a9..c2707c49d 100644 --- a/go/tasks/pluginmachinery/core/mocks/resource_registrar.go +++ b/go/tasks/pluginmachinery/core/mocks/resource_registrar.go @@ -14,11 +14,6 @@ type ResourceRegistrar struct { mock.Mock } -// RegisterResourceNamespaceQuotaProportionCap provides a mock function with given fields: ctx, proportionCap -func (_m *ResourceRegistrar) RegisterResourceNamespaceQuotaProportionCap(ctx context.Context, proportionCap float64) { - _m.Called(ctx, proportionCap) -} - type ResourceRegistrar_RegisterResourceQuota struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/io/iface.go b/go/tasks/pluginmachinery/io/iface.go index 7d87e9fe3..4719bc5be 100644 --- a/go/tasks/pluginmachinery/io/iface.go +++ b/go/tasks/pluginmachinery/io/iface.go @@ -41,19 +41,22 @@ type OutputReader interface { Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error) } -// Data sandbox is the actual path where the data produced by a task can be placed. It is completely optional. The advantage +// RawOutputPaths is the actual path where the data produced by a task can be placed. It is completely optional. The advantage // of using this path is to provide exactly once semantics. It is guaranteed that this path is unique for every new execution // of a task (across retries etc) and is constant for a specific execution. // As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to // FlytePluginMachinery so that it can be used more universally outside of Flytekit. -type OutputDataSandbox interface { +type RawOutputPaths interface { // This is prefix (blob store prefix or directory) where all data produced can be stored. - GetOutputDataSandboxPath() storage.DataReference + GetRawOutputPrefix() storage.DataReference } // All paths where various meta outputs produced by the task can be placed, such that the framework can directly access them. // All paths are reperesented using storage.DataReference -> an URN for the configured storage backend type OutputFilePaths interface { + // RawOutputPaths are available with OutputFilePaths + RawOutputPaths + // A path to a directory or prefix that contains all execution metadata for this execution GetOutputPrefixPath() storage.DataReference // A fully qualified path (URN) to where the framework expects the output to exist in the configured storage backend @@ -61,8 +64,6 @@ type OutputFilePaths interface { // A Fully qualified path (URN) where the error information should be placed as a protobuf core.ErrorDocument. It is not directly // used by the framework, but could be used in the future GetErrorPath() storage.DataReference - - OutputDataSandbox } // Framework Output writing interface. diff --git a/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go b/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go index 727fadc32..59686afaa 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go +++ b/go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go @@ -7,7 +7,7 @@ import ( mock "github.com/stretchr/testify/mock" ) -// OutputDataSandbox is an autogenerated mock type for the OutputDataSandbox type +// RawOutputPaths is an autogenerated mock type for the RawOutputPaths type type OutputDataSandbox struct { mock.Mock } @@ -21,17 +21,17 @@ func (_m OutputDataSandbox_GetOutputDataSandboxPath) Return(_a0 storage.DataRefe } func (_m *OutputDataSandbox) OnGetOutputDataSandboxPath() *OutputDataSandbox_GetOutputDataSandboxPath { - c := _m.On("GetOutputDataSandboxPath") + c := _m.On("GetRawOutputPrefix") return &OutputDataSandbox_GetOutputDataSandboxPath{Call: c} } func (_m *OutputDataSandbox) OnGetOutputDataSandboxPathMatch(matchers ...interface{}) *OutputDataSandbox_GetOutputDataSandboxPath { - c := _m.On("GetOutputDataSandboxPath", matchers...) + c := _m.On("GetRawOutputPrefix", matchers...) return &OutputDataSandbox_GetOutputDataSandboxPath{Call: c} } -// GetOutputDataSandboxPath provides a mock function with given fields: -func (_m *OutputDataSandbox) GetOutputDataSandboxPath() storage.DataReference { +// GetRawOutputPrefix provides a mock function with given fields: +func (_m *OutputDataSandbox) GetRawOutputPrefix() storage.DataReference { ret := _m.Called() var r0 storage.DataReference diff --git a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go index f56e86c03..438f06631 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go +++ b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go @@ -44,38 +44,6 @@ func (_m *OutputFilePaths) GetErrorPath() storage.DataReference { return r0 } -type OutputFilePaths_GetOutputDataSandboxPath struct { - *mock.Call -} - -func (_m OutputFilePaths_GetOutputDataSandboxPath) Return(_a0 storage.DataReference) *OutputFilePaths_GetOutputDataSandboxPath { - return &OutputFilePaths_GetOutputDataSandboxPath{Call: _m.Call.Return(_a0)} -} - -func (_m *OutputFilePaths) OnGetOutputDataSandboxPath() *OutputFilePaths_GetOutputDataSandboxPath { - c := _m.On("GetOutputDataSandboxPath") - return &OutputFilePaths_GetOutputDataSandboxPath{Call: c} -} - -func (_m *OutputFilePaths) OnGetOutputDataSandboxPathMatch(matchers ...interface{}) *OutputFilePaths_GetOutputDataSandboxPath { - c := _m.On("GetOutputDataSandboxPath", matchers...) - return &OutputFilePaths_GetOutputDataSandboxPath{Call: c} -} - -// GetOutputDataSandboxPath provides a mock function with given fields: -func (_m *OutputFilePaths) GetOutputDataSandboxPath() storage.DataReference { - ret := _m.Called() - - var r0 storage.DataReference - if rf, ok := ret.Get(0).(func() storage.DataReference); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(storage.DataReference) - } - - return r0 -} - type OutputFilePaths_GetOutputPath struct { *mock.Call } @@ -139,3 +107,35 @@ func (_m *OutputFilePaths) GetOutputPrefixPath() storage.DataReference { return r0 } + +type OutputFilePaths_GetRawOutputPrefix struct { + *mock.Call +} + +func (_m OutputFilePaths_GetRawOutputPrefix) Return(_a0 storage.DataReference) *OutputFilePaths_GetRawOutputPrefix { + return &OutputFilePaths_GetRawOutputPrefix{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputFilePaths) OnGetRawOutputPrefix() *OutputFilePaths_GetRawOutputPrefix { + c := _m.On("GetRawOutputPrefix") + return &OutputFilePaths_GetRawOutputPrefix{Call: c} +} + +func (_m *OutputFilePaths) OnGetRawOutputPrefixMatch(matchers ...interface{}) *OutputFilePaths_GetRawOutputPrefix { + c := _m.On("GetRawOutputPrefix", matchers...) + return &OutputFilePaths_GetRawOutputPrefix{Call: c} +} + +// GetRawOutputPrefix provides a mock function with given fields: +func (_m *OutputFilePaths) GetRawOutputPrefix() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} diff --git a/go/tasks/pluginmachinery/io/mocks/output_writer.go b/go/tasks/pluginmachinery/io/mocks/output_writer.go index cfc38957c..4d2f8a63a 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_writer.go +++ b/go/tasks/pluginmachinery/io/mocks/output_writer.go @@ -48,38 +48,6 @@ func (_m *OutputWriter) GetErrorPath() storage.DataReference { return r0 } -type OutputWriter_GetOutputDataSandboxPath struct { - *mock.Call -} - -func (_m OutputWriter_GetOutputDataSandboxPath) Return(_a0 storage.DataReference) *OutputWriter_GetOutputDataSandboxPath { - return &OutputWriter_GetOutputDataSandboxPath{Call: _m.Call.Return(_a0)} -} - -func (_m *OutputWriter) OnGetOutputDataSandboxPath() *OutputWriter_GetOutputDataSandboxPath { - c := _m.On("GetOutputDataSandboxPath") - return &OutputWriter_GetOutputDataSandboxPath{Call: c} -} - -func (_m *OutputWriter) OnGetOutputDataSandboxPathMatch(matchers ...interface{}) *OutputWriter_GetOutputDataSandboxPath { - c := _m.On("GetOutputDataSandboxPath", matchers...) - return &OutputWriter_GetOutputDataSandboxPath{Call: c} -} - -// GetOutputDataSandboxPath provides a mock function with given fields: -func (_m *OutputWriter) GetOutputDataSandboxPath() storage.DataReference { - ret := _m.Called() - - var r0 storage.DataReference - if rf, ok := ret.Get(0).(func() storage.DataReference); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(storage.DataReference) - } - - return r0 -} - type OutputWriter_GetOutputPath struct { *mock.Call } @@ -144,6 +112,38 @@ func (_m *OutputWriter) GetOutputPrefixPath() storage.DataReference { return r0 } +type OutputWriter_GetRawOutputPrefix struct { + *mock.Call +} + +func (_m OutputWriter_GetRawOutputPrefix) Return(_a0 storage.DataReference) *OutputWriter_GetRawOutputPrefix { + return &OutputWriter_GetRawOutputPrefix{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputWriter) OnGetRawOutputPrefix() *OutputWriter_GetRawOutputPrefix { + c := _m.On("GetRawOutputPrefix") + return &OutputWriter_GetRawOutputPrefix{Call: c} +} + +func (_m *OutputWriter) OnGetRawOutputPrefixMatch(matchers ...interface{}) *OutputWriter_GetRawOutputPrefix { + c := _m.On("GetRawOutputPrefix", matchers...) + return &OutputWriter_GetRawOutputPrefix{Call: c} +} + +// GetRawOutputPrefix provides a mock function with given fields: +func (_m *OutputWriter) GetRawOutputPrefix() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type OutputWriter_Put struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/io/mocks/raw_output_paths.go b/go/tasks/pluginmachinery/io/mocks/raw_output_paths.go new file mode 100644 index 000000000..02d8329d9 --- /dev/null +++ b/go/tasks/pluginmachinery/io/mocks/raw_output_paths.go @@ -0,0 +1,45 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + storage "github.com/lyft/flytestdlib/storage" + mock "github.com/stretchr/testify/mock" +) + +// RawOutputPaths is an autogenerated mock type for the RawOutputPaths type +type RawOutputPaths struct { + mock.Mock +} + +type RawOutputPaths_GetRawOutputPrefix struct { + *mock.Call +} + +func (_m RawOutputPaths_GetRawOutputPrefix) Return(_a0 storage.DataReference) *RawOutputPaths_GetRawOutputPrefix { + return &RawOutputPaths_GetRawOutputPrefix{Call: _m.Call.Return(_a0)} +} + +func (_m *RawOutputPaths) OnGetRawOutputPrefix() *RawOutputPaths_GetRawOutputPrefix { + c := _m.On("GetRawOutputPrefix") + return &RawOutputPaths_GetRawOutputPrefix{Call: c} +} + +func (_m *RawOutputPaths) OnGetRawOutputPrefixMatch(matchers ...interface{}) *RawOutputPaths_GetRawOutputPrefix { + c := _m.On("GetRawOutputPrefix", matchers...) + return &RawOutputPaths_GetRawOutputPrefix{Call: c} +} + +// GetRawOutputPrefix provides a mock function with given fields: +func (_m *RawOutputPaths) GetRawOutputPrefix() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} diff --git a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go index 3cc0faea7..2a20272f6 100644 --- a/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go +++ b/go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" ) +// Generates the entire latin alphabet and appends it to the passed in array and returns the new array func GenerateAlphabet(b []rune) []rune { for i := 'a'; i <= 'z'; i++ { b = append(b, i) @@ -15,6 +16,7 @@ func GenerateAlphabet(b []rune) []rune { return b } +// Generates all arabic numerals and appends to the passed in array and returns the new array/slice func GenerateArabicNumerals(b []rune) []rune { for i := '0'; i <= '9'; i++ { b = append(b, i) diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go similarity index 59% rename from go/tasks/pluginmachinery/ioutils/output_sandbox.go rename to go/tasks/pluginmachinery/ioutils/raw_output_path.go index 552f61998..b29c15592 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -10,19 +10,19 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" ) -type precomputedOutputSandbox struct { +type precomputedRawOutputPaths struct { path storage.DataReference } -func (r precomputedOutputSandbox) GetOutputDataSandboxPath() storage.DataReference { +func (r precomputedRawOutputPaths) GetRawOutputPrefix() storage.DataReference { return r.path } -// Creates a deterministic OutputSandbox whose path is distributed based on the ShardSelector passed in. +// Creates a deterministic RawOutputPath whose path is distributed based on the ShardSelector passed in. // Determinism depends on the outputMetadataPath -// Potential performance problem, as creating anew randomprefixShardedOutput Sandbox may be expensive as it hashes the outputMetadataPath -// the final OutputSandbox is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata -func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelector, basePath, outputMetadataPath storage.DataReference, store storage.ReferenceConstructor) (io.OutputDataSandbox, error) { +// Potential performance problem, as creating a new RawPath creation may be expensive as it hashes the outputMetadataPath +// the final RawOutputPath is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata +func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSelector, basePath, outputMetadataPath storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { o := []byte(outputMetadataPath) prefix, err := sharder.GetShardPrefix(ctx, o) if err != nil { @@ -38,19 +38,19 @@ func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelec if err != nil { return nil, err } - return precomputedOutputSandbox{ + return precomputedRawOutputPaths{ path: path, }, nil } // A simple Output sandbox at a given path -func NewOutputSandbox(_ context.Context, outputSandboxPath storage.DataReference) io.OutputDataSandbox { - return precomputedOutputSandbox{path: outputSandboxPath} +func NewRawOutputPath(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths { + return precomputedRawOutputPaths{path: outputSandboxPath} } // Creates an OutputSandbox in the basePath using the uniqueID and a sharder // This implementation is faster than the Randomized strategy -func NewShardedOutputSandbox(ctx context.Context, sharder ShardSelector, basePath storage.DataReference, uniqueID string, store storage.ReferenceConstructor) (io.OutputDataSandbox, error) { +func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePath storage.DataReference, uniqueID string, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { o := []byte(uniqueID) prefix, err := sharder.GetShardPrefix(ctx, o) if err != nil { @@ -60,7 +60,7 @@ func NewShardedOutputSandbox(ctx context.Context, sharder ShardSelector, basePat if err != nil { return nil, err } - return precomputedOutputSandbox{ + return precomputedRawOutputPaths{ path: path, }, nil } diff --git a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go similarity index 64% rename from go/tasks/pluginmachinery/ioutils/output_sandbox_test.go rename to go/tasks/pluginmachinery/ioutils/raw_output_path_test.go index d6a41b32a..a8f9680ee 100644 --- a/go/tasks/pluginmachinery/ioutils/output_sandbox_test.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go @@ -9,7 +9,7 @@ import ( ) func TestNewOutputSandbox(t *testing.T) { - assert.Equal(t, NewOutputSandbox(context.TODO(), "x").GetOutputDataSandboxPath(), storage.DataReference("x")) + assert.Equal(t, NewRawOutputPath(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x")) } func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { @@ -17,14 +17,14 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { t.Run("success-path", func(t *testing.T) { ss := NewConstantShardSelector([]string{"x"}) - sd, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) + sd, err := NewShardedDeterministicRawOutputPath(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) assert.NoError(t, err) - assert.Equal(t, storage.DataReference("s3://bucket/x/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetOutputDataSandboxPath()) + assert.Equal(t, storage.DataReference("s3://bucket/x/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetRawOutputPrefix()) }) t.Run("error", func(t *testing.T) { ss := NewConstantShardSelector([]string{"s3:// abc"}) - sd, err := NewRandomPrefixShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) + sd, err := NewShardedDeterministicRawOutputPath(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) assert.Error(t, err, "%s", sd) }) } @@ -33,14 +33,14 @@ func TestNewShardedOutputSandbox(t *testing.T) { ctx := context.TODO() t.Run("", func(t *testing.T) { ss := NewConstantShardSelector([]string{"x"}) - sd, err := NewShardedOutputSandbox(ctx, ss, "s3://flyte", "unique", storage.URLPathConstructor{}) + sd, err := NewShardedRawOutputPath(ctx, ss, "s3://flyte", "unique", storage.URLPathConstructor{}) assert.NoError(t, err) - assert.Equal(t, storage.DataReference("s3://flyte/x/unique"), sd.GetOutputDataSandboxPath()) + assert.Equal(t, storage.DataReference("s3://flyte/x/unique"), sd.GetRawOutputPrefix()) }) t.Run("error", func(t *testing.T) { ss := NewConstantShardSelector([]string{"s3:// abc"}) - sd, err := NewShardedOutputSandbox(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) + sd, err := NewShardedRawOutputPath(ctx, ss, "s3://bucket", "m", storage.URLPathConstructor{}) assert.Error(t, err, "%s", sd) }) } diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go index 12ac920f7..cf7b5cc05 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go @@ -18,7 +18,7 @@ type RemoteFileOutputWriter struct { type RemoteFileOutputPaths struct { outputPrefix storage.DataReference store storage.ReferenceConstructor - io.OutputDataSandbox + io.RawOutputPaths } var ( @@ -72,11 +72,11 @@ func (w RemoteFileOutputWriter) Put(ctx context.Context, reader io.OutputReader) return fmt.Errorf("no data found to write") } -func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference, sandbox io.OutputDataSandbox) RemoteFileOutputPaths { +func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference, sandbox io.RawOutputPaths) RemoteFileOutputPaths { return RemoteFileOutputPaths{ - store: store, - outputPrefix: outputPrefix, - OutputDataSandbox: sandbox, + store: store, + outputPrefix: outputPrefix, + RawOutputPaths: sandbox, } } diff --git a/go/tasks/pluginmachinery/utils/template_test.go b/go/tasks/pluginmachinery/utils/template_test.go index 97e812418..6b93cb545 100755 --- a/go/tasks/pluginmachinery/utils/template_test.go +++ b/go/tasks/pluginmachinery/utils/template_test.go @@ -45,7 +45,7 @@ type dummyOutputPaths struct { outputPath storage.DataReference } -func (d dummyOutputPaths) GetOutputDataSandboxPath() storage.DataReference { +func (d dummyOutputPaths) GetRawOutputPrefix() storage.DataReference { panic("should not be called") } diff --git a/go/tasks/plugins/array/awsbatch/executor.go b/go/tasks/plugins/array/awsbatch/executor.go index 0cdd5a854..cb35ced99 100644 --- a/go/tasks/plugins/array/awsbatch/executor.go +++ b/go/tasks/plugins/array/awsbatch/executor.go @@ -78,7 +78,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c case arrayCore.PhaseCheckingSubTaskExecutions: pluginState, err = CheckSubTasksState(ctx, tCtx.TaskExecutionMetadata(), - tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), + tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), e.jobStore, tCtx.DataStore(), pluginConfig, pluginState) case arrayCore.PhaseAssembleFinalOutput: diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index e306ba3f9..8fceebeb5 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -64,7 +64,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } // build output writers - outputWriters, err := ConstructOutputWriters(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), int(arrayJob.Size)) + outputWriters, err := ConstructOutputWriters(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), int(arrayJob.Size)) if err != nil { return state, err } @@ -164,7 +164,7 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state } // output reader - outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), int(arrayJob.Size)) + outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), int(arrayJob.Size)) if err != nil { return nil, err } @@ -401,7 +401,7 @@ func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, ou return nil, err } - p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewOutputSandbox(ctx, outputSandbox)) + p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRawOutputPath(ctx, outputSandbox)) return ioutils.NewRemoteFileOutputWriter(ctx, dataStore, p), nil } @@ -435,6 +435,6 @@ func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, ou return nil, err } - outputPath := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewOutputSandbox(ctx, outputSandbox)) + outputPath := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRawOutputPath(ctx, outputSandbox)) return ioutils.NewRemoteFileOutputReader(ctx, dataStore, outputPath, int64(999999999)), nil } diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index 62fef9756..6f4f51803 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -69,7 +69,7 @@ func runDetermineDiscoverabilityTest(t testing.TB, taskTemplate *core.TaskTempla ow := &ioMocks.OutputWriter{} ow.OnGetOutputPrefixPath().Return("/prefix/") - ow.OnGetOutputDataSandboxPath().Return("/sandbox/") + ow.OnGetRawOutputPrefix().Return("/sandbox/") ow.OnGetOutputPath().Return("/prefix/outputs.pb") ow.On("Put", mock.Anything, mock.Anything).Return(func(ctx context.Context, or io.OutputReader) error { m, ee, err := or.Read(ctx) diff --git a/go/tasks/plugins/array/k8s/executor.go b/go/tasks/plugins/array/k8s/executor.go index 13cb7063f..de8b46973 100644 --- a/go/tasks/plugins/array/k8s/executor.go +++ b/go/tasks/plugins/array/k8s/executor.go @@ -81,7 +81,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c case arrayCore.PhaseCheckingSubTaskExecutions: nextState, logLinks, err = CheckSubTasksState(ctx, tCtx, e.kubeClient, tCtx.DataStore(), - tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetOutputDataSandboxPath(), pluginState) + tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState) case arrayCore.PhaseAssembleFinalOutput: nextState, err = array.AssembleFinalOutputs(ctx, e.outputsAssembler, tCtx, arrayCore.PhaseSuccess, pluginState) diff --git a/go/tasks/plugins/array/outputs.go b/go/tasks/plugins/array/outputs.go index 55d588d6f..7d0d1bf9e 100644 --- a/go/tasks/plugins/array/outputs.go +++ b/go/tasks/plugins/array/outputs.go @@ -56,7 +56,7 @@ type assembleOutputsWorker struct { func (w assembleOutputsWorker) Process(ctx context.Context, workItem workqueue.WorkItem) (workqueue.WorkStatus, error) { i := workItem.(*outputAssembleItem) - outputReaders, err := ConstructOutputReaders(ctx, i.dataStore, i.outputPaths.GetOutputPrefixPath(), i.outputPaths.GetOutputDataSandboxPath(), int(i.finalPhases.ItemsCount)) + outputReaders, err := ConstructOutputReaders(ctx, i.dataStore, i.outputPaths.GetOutputPrefixPath(), i.outputPaths.GetRawOutputPrefix(), int(i.finalPhases.ItemsCount)) if err != nil { logger.Warnf(ctx, "Failed to construct output readers. Error: %v", err) return workqueue.WorkStatusFailed, err @@ -256,7 +256,7 @@ type assembleErrorsWorker struct { func (a assembleErrorsWorker) Process(ctx context.Context, workItem workqueue.WorkItem) (workqueue.WorkStatus, error) { w := workItem.(*outputAssembleItem) - outputReaders, err := ConstructOutputReaders(ctx, w.dataStore, w.outputPaths.GetOutputPrefixPath(), w.outputPaths.GetOutputDataSandboxPath(), int(w.finalPhases.ItemsCount)) + outputReaders, err := ConstructOutputReaders(ctx, w.dataStore, w.outputPaths.GetOutputPrefixPath(), w.outputPaths.GetRawOutputPrefix(), int(w.finalPhases.ItemsCount)) if err != nil { return workqueue.WorkStatusNotDone, err } diff --git a/go/tasks/plugins/array/outputs_test.go b/go/tasks/plugins/array/outputs_test.go index 2ea1ddb4f..e15af79f0 100644 --- a/go/tasks/plugins/array/outputs_test.go +++ b/go/tasks/plugins/array/outputs_test.go @@ -88,7 +88,7 @@ func Test_assembleOutputsWorker_Process(t *testing.T) { ow := &mocks2.OutputWriter{} ow.OnGetOutputPrefixPath().Return("/bucket/prefix") ow.OnGetOutputPath().Return("/bucket/prefix/outputs.pb") - ow.OnGetOutputDataSandboxPath().Return("/bucket/sandbox/") + ow.OnGetRawOutputPrefix().Return("/bucket/sandbox/") // Setup the input phases that inform outputs worker about which tasks failed/succeeded. phases := arrayCore.NewPhasesCompactArray(4) @@ -362,7 +362,7 @@ func Test_assembleErrorsWorker_Process(t *testing.T) { // Setup the expected data to be written to outputWriter. ow := &mocks2.OutputWriter{} - ow.OnGetOutputDataSandboxPath().Return("/bucket/sandbox/") + ow.OnGetRawOutputPrefix().Return("/bucket/sandbox/") ow.OnGetOutputPrefixPath().Return("/bucket/prefix") ow.OnGetErrorPath().Return("/bucket/prefix/error.pb") ow.On("Put", mock.Anything, mock.Anything).Return(func(ctx context.Context, reader io.OutputReader) error { diff --git a/tests/end_to_end.go b/tests/end_to_end.go index ff16a181c..a60b6c995 100644 --- a/tests/end_to_end.go +++ b/tests/end_to_end.go @@ -85,7 +85,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i inputReader.OnGetInputPath().Return(basePrefix + "/inputs.pb") outputWriter := &ioMocks.OutputWriter{} - outputWriter.OnGetOutputDataSandboxPath().Return("/sandbox/") + outputWriter.OnGetRawOutputPrefix().Return("/sandbox/") outputWriter.OnGetOutputPrefixPath().Return(basePrefix) outputWriter.OnGetErrorPath().Return(basePrefix + "/error.pb") outputWriter.OnGetOutputPath().Return(basePrefix + "/outputs.pb") From 4e169400cc0e02925e0935a0b87d52e778ba677c Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 23 Mar 2020 21:41:53 -0700 Subject: [PATCH 12/15] Update go/tasks/pluginmachinery/ioutils/raw_output_path.go Co-Authored-By: Haytham AbuelFutuh --- go/tasks/pluginmachinery/ioutils/raw_output_path.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go index b29c15592..b972a0164 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -22,7 +22,7 @@ func (r precomputedRawOutputPaths) GetRawOutputPrefix() storage.DataReference { // Determinism depends on the outputMetadataPath // Potential performance problem, as creating a new RawPath creation may be expensive as it hashes the outputMetadataPath // the final RawOutputPath is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata -func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSelector, basePath, outputMetadataPath storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { +func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSelector, basePrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { o := []byte(outputMetadataPath) prefix, err := sharder.GetShardPrefix(ctx, o) if err != nil { From dd03159b295086014aaedd5fa84ed091cfd5db6e Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 23 Mar 2020 21:42:05 -0700 Subject: [PATCH 13/15] Update go/tasks/pluginmachinery/ioutils/raw_output_path.go Co-Authored-By: Haytham AbuelFutuh --- go/tasks/pluginmachinery/ioutils/raw_output_path.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go index b972a0164..beb4297b5 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -44,7 +44,7 @@ func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSele } // A simple Output sandbox at a given path -func NewRawOutputPath(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths { +func NewRawOutputPaths(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths { return precomputedRawOutputPaths{path: outputSandboxPath} } From d84b7493c8ac50cea34546eb783b9f1f6f22f5c1 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 25 Mar 2020 16:29:31 -0700 Subject: [PATCH 14/15] rename issues --- go/tasks/pluginmachinery/ioutils/raw_output_path.go | 4 ++-- go/tasks/pluginmachinery/ioutils/raw_output_path_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go index beb4297b5..ab6229514 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -23,7 +23,7 @@ func (r precomputedRawOutputPaths) GetRawOutputPrefix() storage.DataReference { // Potential performance problem, as creating a new RawPath creation may be expensive as it hashes the outputMetadataPath // the final RawOutputPath is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSelector, basePrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { - o := []byte(outputMetadataPath) + o := []byte(outputMetadataPrefix) prefix, err := sharder.GetShardPrefix(ctx, o) if err != nil { return nil, err @@ -34,7 +34,7 @@ func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSele if _, err := m.Write(o); err != nil { return nil, err } - path, err := store.ConstructReference(ctx, basePath, prefix, hex.EncodeToString(m.Sum(nil))) + path, err := store.ConstructReference(ctx, basePrefix, prefix, hex.EncodeToString(m.Sum(nil))) if err != nil { return nil, err } diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go index a8f9680ee..e77548fc8 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go @@ -9,7 +9,7 @@ import ( ) func TestNewOutputSandbox(t *testing.T) { - assert.Equal(t, NewRawOutputPath(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x")) + assert.Equal(t, NewRawOutputPaths(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x")) } func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { From 434bd7118174b1e3b7ad8b7a9fe25d01fc9045a6 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 25 Mar 2020 16:31:47 -0700 Subject: [PATCH 15/15] rename fix --- go/tasks/plugins/array/catalog.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 8fceebeb5..05960a29d 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -401,7 +401,7 @@ func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, ou return nil, err } - p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRawOutputPath(ctx, outputSandbox)) + p := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRawOutputPaths(ctx, outputSandbox)) return ioutils.NewRemoteFileOutputWriter(ctx, dataStore, p), nil } @@ -435,6 +435,6 @@ func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, ou return nil, err } - outputPath := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRawOutputPath(ctx, outputSandbox)) + outputPath := ioutils.NewRemoteFileOutputPaths(ctx, dataStore, dataReference, ioutils.NewRawOutputPaths(ctx, outputSandbox)) return ioutils.NewRemoteFileOutputReader(ctx, dataStore, outputPath, int64(999999999)), nil }