From 36d9b593219893174410f43eeb31db04121068b1 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 26 Aug 2020 23:04:59 -0700 Subject: [PATCH 1/3] Optional Rawoutput path creator that does not create prefix Shard - Prefix sharding is only useful for S3 and specifically for provisioned buckets - For buckets that are not provisioned for high throughput, it might be useful to use no sharding to make it easy to find the path - The path option is still not human readable but derivable --- .../ioutils/raw_output_path.go | 22 +++++++++++++++++-- .../ioutils/raw_output_path_test.go | 20 +++++++++++++++-- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go index ab6229514..72644acc8 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -44,8 +44,8 @@ func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSele } // A simple Output sandbox at a given path -func NewRawOutputPaths(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths { - return precomputedRawOutputPaths{path: outputSandboxPath} +func NewRawOutputPaths(_ context.Context, rawOutputPrefix storage.DataReference) io.RawOutputPaths { + return precomputedRawOutputPaths{path: rawOutputPrefix} } // Creates an OutputSandbox in the basePath using the uniqueID and a sharder @@ -64,3 +64,21 @@ func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePat path: path, }, nil } + +// Constructs an output path that is deterministic and unique within the given outputPrefix. No sharding is performed +func NewDeterministicUniqueRawOutputPath(ctx context.Context, rawOutputPrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { + o := []byte(outputMetadataPrefix) + /* #nosec */ + // We use SHA1 for sheer speed instead of no collisions. As because of the shard Prefix + hash is pretty unique :) + m := sha1.New() + if _, err := m.Write(o); err != nil { + return nil, err + } + path, err := store.ConstructReference(ctx, rawOutputPrefix, hex.EncodeToString(m.Sum(nil))) + if err != nil { + return nil, err + } + return precomputedRawOutputPaths{ + path: path, + }, nil +} \ No newline at end of file diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go index e77548fc8..ec2436366 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go @@ -12,7 +12,7 @@ func TestNewOutputSandbox(t *testing.T) { assert.Equal(t, NewRawOutputPaths(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x")) } -func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { +func TestNewShardedDeterministicRawOutputPath(t *testing.T) { ctx := context.TODO() t.Run("success-path", func(t *testing.T) { @@ -29,7 +29,7 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { }) } -func TestNewShardedOutputSandbox(t *testing.T) { +func TestNewShardedRawOutputPath(t *testing.T) { ctx := context.TODO() t.Run("", func(t *testing.T) { ss := NewConstantShardSelector([]string{"x"}) @@ -44,3 +44,19 @@ func TestNewShardedOutputSandbox(t *testing.T) { assert.Error(t, err, "%s", sd) }) } + +func TestNewDeterministicUniqueRawOutputPath(t *testing.T) { + ctx := context.TODO() + + t.Run("success-path", func(t *testing.T) { + sd, err := NewDeterministicUniqueRawOutputPath(ctx, "s3://bucket", "m", storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, storage.DataReference("s3://bucket/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetRawOutputPrefix()) + }) + + t.Run("error-not-possible", func(t *testing.T) { + sd, err := NewDeterministicUniqueRawOutputPath(ctx, "bucket", "m", storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, "/bucket/6b0d31c0d563223024da45691584643ac78c96e8", sd.GetRawOutputPrefix().String()) + }) +} From f1bba83dfb8a81bd36220a8d9639f3f5b2c8b0bd Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 26 Aug 2020 23:19:24 -0700 Subject: [PATCH 2/3] More options to generate raw paths --- .../ioutils/raw_output_path.go | 15 +++++++++++++- .../ioutils/raw_output_path_test.go | 20 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go index 72644acc8..110499d2b 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -4,7 +4,9 @@ import ( "context" "crypto/sha1" // #nosec "encoding/hex" + "strconv" + core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/storage" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" @@ -81,4 +83,15 @@ func NewDeterministicUniqueRawOutputPath(ctx context.Context, rawOutputPrefix, o return precomputedRawOutputPaths{ path: path, }, nil -} \ No newline at end of file +} + +// Generates a RawOutput Path that looks like the TaskExecutionID and can be easily cross referenced with Flyte generated TaskExecution ID +func NewTaskIDRawOutputPath(ctx context.Context, rawOutputPrefix storage.DataReference, taskID *core2.TaskExecutionIdentifier, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { + path, err := store.ConstructReference(ctx, rawOutputPrefix, taskID.GetNodeExecutionId().GetExecutionId().GetProject(), taskID.GetNodeExecutionId().GetExecutionId().GetDomain(), taskID.GetNodeExecutionId().GetExecutionId().GetName(), taskID.GetNodeExecutionId().GetNodeId(), strconv.Itoa(int(taskID.GetRetryAttempt())), taskID.GetTaskId().GetName()) + if err != nil { + return nil, err + } + return precomputedRawOutputPaths{ + path: path, + }, nil +} diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go index ec2436366..6c7289df3 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" ) @@ -60,3 +61,22 @@ func TestNewDeterministicUniqueRawOutputPath(t *testing.T) { assert.Equal(t, "/bucket/6b0d31c0d563223024da45691584643ac78c96e8", sd.GetRawOutputPrefix().String()) }) } + +func TestNewTaskIDRawOutputPath(t *testing.T) { + p, err := NewTaskIDRawOutputPath(context.TODO(), "s3://bucket", &core2.TaskExecutionIdentifier{ + NodeExecutionId: &core2.NodeExecutionIdentifier{ + NodeId: "n1", + ExecutionId: &core2.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "exec", + }, + }, + RetryAttempt: 0, + TaskId: &core2.Identifier{ + Name: "task1", + }, + }, storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, "s3://bucket/project/domain/exec/n1/0/task1", p.GetRawOutputPrefix().String()) +} \ No newline at end of file From 1dd5a1207bcc413019725fab2e2a97c12d2c3e49 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Thu, 27 Aug 2020 08:55:48 -0700 Subject: [PATCH 3/3] Lint fix --- go/tasks/pluginmachinery/ioutils/raw_output_path_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go index 6c7289df3..b50ea772c 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go @@ -68,8 +68,8 @@ func TestNewTaskIDRawOutputPath(t *testing.T) { NodeId: "n1", ExecutionId: &core2.WorkflowExecutionIdentifier{ Project: "project", - Domain: "domain", - Name: "exec", + Domain: "domain", + Name: "exec", }, }, RetryAttempt: 0, @@ -79,4 +79,4 @@ func TestNewTaskIDRawOutputPath(t *testing.T) { }, storage.URLPathConstructor{}) assert.NoError(t, err) assert.Equal(t, "s3://bucket/project/domain/exec/n1/0/task1", p.GetRawOutputPrefix().String()) -} \ No newline at end of file +}