diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path.go b/go/tasks/pluginmachinery/ioutils/raw_output_path.go index ab6229514..110499d2b 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path.go @@ -4,7 +4,9 @@ import ( "context" "crypto/sha1" // #nosec "encoding/hex" + "strconv" + core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/storage" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" @@ -44,8 +46,8 @@ func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSele } // A simple Output sandbox at a given path -func NewRawOutputPaths(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths { - return precomputedRawOutputPaths{path: outputSandboxPath} +func NewRawOutputPaths(_ context.Context, rawOutputPrefix storage.DataReference) io.RawOutputPaths { + return precomputedRawOutputPaths{path: rawOutputPrefix} } // Creates an OutputSandbox in the basePath using the uniqueID and a sharder @@ -64,3 +66,32 @@ func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePat path: path, }, nil } + +// Constructs an output path that is deterministic and unique within the given outputPrefix. No sharding is performed +func NewDeterministicUniqueRawOutputPath(ctx context.Context, rawOutputPrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { + o := []byte(outputMetadataPrefix) + /* #nosec */ + // We use SHA1 for sheer speed instead of no collisions. As because of the shard Prefix + hash is pretty unique :) + m := sha1.New() + if _, err := m.Write(o); err != nil { + return nil, err + } + path, err := store.ConstructReference(ctx, rawOutputPrefix, hex.EncodeToString(m.Sum(nil))) + if err != nil { + return nil, err + } + return precomputedRawOutputPaths{ + path: path, + }, nil +} + +// Generates a RawOutput Path that looks like the TaskExecutionID and can be easily cross referenced with Flyte generated TaskExecution ID +func NewTaskIDRawOutputPath(ctx context.Context, rawOutputPrefix storage.DataReference, taskID *core2.TaskExecutionIdentifier, store storage.ReferenceConstructor) (io.RawOutputPaths, error) { + path, err := store.ConstructReference(ctx, rawOutputPrefix, taskID.GetNodeExecutionId().GetExecutionId().GetProject(), taskID.GetNodeExecutionId().GetExecutionId().GetDomain(), taskID.GetNodeExecutionId().GetExecutionId().GetName(), taskID.GetNodeExecutionId().GetNodeId(), strconv.Itoa(int(taskID.GetRetryAttempt())), taskID.GetTaskId().GetName()) + if err != nil { + return nil, err + } + return precomputedRawOutputPaths{ + path: path, + }, nil +} diff --git a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go index e77548fc8..b50ea772c 100644 --- a/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go +++ b/go/tasks/pluginmachinery/ioutils/raw_output_path_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" ) @@ -12,7 +13,7 @@ func TestNewOutputSandbox(t *testing.T) { assert.Equal(t, NewRawOutputPaths(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x")) } -func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { +func TestNewShardedDeterministicRawOutputPath(t *testing.T) { ctx := context.TODO() t.Run("success-path", func(t *testing.T) { @@ -29,7 +30,7 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) { }) } -func TestNewShardedOutputSandbox(t *testing.T) { +func TestNewShardedRawOutputPath(t *testing.T) { ctx := context.TODO() t.Run("", func(t *testing.T) { ss := NewConstantShardSelector([]string{"x"}) @@ -44,3 +45,38 @@ func TestNewShardedOutputSandbox(t *testing.T) { assert.Error(t, err, "%s", sd) }) } + +func TestNewDeterministicUniqueRawOutputPath(t *testing.T) { + ctx := context.TODO() + + t.Run("success-path", func(t *testing.T) { + sd, err := NewDeterministicUniqueRawOutputPath(ctx, "s3://bucket", "m", storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, storage.DataReference("s3://bucket/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetRawOutputPrefix()) + }) + + t.Run("error-not-possible", func(t *testing.T) { + sd, err := NewDeterministicUniqueRawOutputPath(ctx, "bucket", "m", storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, "/bucket/6b0d31c0d563223024da45691584643ac78c96e8", sd.GetRawOutputPrefix().String()) + }) +} + +func TestNewTaskIDRawOutputPath(t *testing.T) { + p, err := NewTaskIDRawOutputPath(context.TODO(), "s3://bucket", &core2.TaskExecutionIdentifier{ + NodeExecutionId: &core2.NodeExecutionIdentifier{ + NodeId: "n1", + ExecutionId: &core2.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "exec", + }, + }, + RetryAttempt: 0, + TaskId: &core2.Identifier{ + Name: "task1", + }, + }, storage.URLPathConstructor{}) + assert.NoError(t, err) + assert.Equal(t, "s3://bucket/project/domain/exec/n1/0/task1", p.GetRawOutputPrefix().String()) +}