Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

feat; allow users choose raw output path style (sharded, non sharded, human readable etc) #116

Merged
merged 3 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions go/tasks/pluginmachinery/ioutils/raw_output_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/sha1" // #nosec
"encoding/hex"
"strconv"

core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down Expand Up @@ -44,8 +46,8 @@ func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSele
}

// A simple Output sandbox at a given path
func NewRawOutputPaths(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths {
return precomputedRawOutputPaths{path: outputSandboxPath}
func NewRawOutputPaths(_ context.Context, rawOutputPrefix storage.DataReference) io.RawOutputPaths {
return precomputedRawOutputPaths{path: rawOutputPrefix}
}

// Creates an OutputSandbox in the basePath using the uniqueID and a sharder
Expand All @@ -64,3 +66,32 @@ func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePat
path: path,
}, nil
}

// Constructs an output path that is deterministic and unique within the given outputPrefix. No sharding is performed
func NewDeterministicUniqueRawOutputPath(ctx context.Context, rawOutputPrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) {
o := []byte(outputMetadataPrefix)
/* #nosec */
// We use SHA1 for sheer speed instead of no collisions. As because of the shard Prefix + hash is pretty unique :)
m := sha1.New()
if _, err := m.Write(o); err != nil {
return nil, err
}
path, err := store.ConstructReference(ctx, rawOutputPrefix, hex.EncodeToString(m.Sum(nil)))
if err != nil {
return nil, err
}
return precomputedRawOutputPaths{
path: path,
}, nil
}

// Generates a RawOutput Path that looks like the TaskExecutionID and can be easily cross referenced with Flyte generated TaskExecution ID
func NewTaskIDRawOutputPath(ctx context.Context, rawOutputPrefix storage.DataReference, taskID *core2.TaskExecutionIdentifier, store storage.ReferenceConstructor) (io.RawOutputPaths, error) {
path, err := store.ConstructReference(ctx, rawOutputPrefix, taskID.GetNodeExecutionId().GetExecutionId().GetProject(), taskID.GetNodeExecutionId().GetExecutionId().GetDomain(), taskID.GetNodeExecutionId().GetExecutionId().GetName(), taskID.GetNodeExecutionId().GetNodeId(), strconv.Itoa(int(taskID.GetRetryAttempt())), taskID.GetTaskId().GetName())
if err != nil {
return nil, err
}
return precomputedRawOutputPaths{
path: path,
}, nil
}
40 changes: 38 additions & 2 deletions go/tasks/pluginmachinery/ioutils/raw_output_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
)
Expand All @@ -12,7 +13,7 @@ func TestNewOutputSandbox(t *testing.T) {
assert.Equal(t, NewRawOutputPaths(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x"))
}

func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) {
func TestNewShardedDeterministicRawOutputPath(t *testing.T) {
ctx := context.TODO()

t.Run("success-path", func(t *testing.T) {
Expand All @@ -29,7 +30,7 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) {
})
}

func TestNewShardedOutputSandbox(t *testing.T) {
func TestNewShardedRawOutputPath(t *testing.T) {
ctx := context.TODO()
t.Run("", func(t *testing.T) {
ss := NewConstantShardSelector([]string{"x"})
Expand All @@ -44,3 +45,38 @@ func TestNewShardedOutputSandbox(t *testing.T) {
assert.Error(t, err, "%s", sd)
})
}

func TestNewDeterministicUniqueRawOutputPath(t *testing.T) {
ctx := context.TODO()

t.Run("success-path", func(t *testing.T) {
sd, err := NewDeterministicUniqueRawOutputPath(ctx, "s3://bucket", "m", storage.URLPathConstructor{})
assert.NoError(t, err)
assert.Equal(t, storage.DataReference("s3://bucket/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetRawOutputPrefix())
})

t.Run("error-not-possible", func(t *testing.T) {
sd, err := NewDeterministicUniqueRawOutputPath(ctx, "bucket", "m", storage.URLPathConstructor{})
assert.NoError(t, err)
assert.Equal(t, "/bucket/6b0d31c0d563223024da45691584643ac78c96e8", sd.GetRawOutputPrefix().String())
})
}

func TestNewTaskIDRawOutputPath(t *testing.T) {
p, err := NewTaskIDRawOutputPath(context.TODO(), "s3://bucket", &core2.TaskExecutionIdentifier{
NodeExecutionId: &core2.NodeExecutionIdentifier{
NodeId: "n1",
ExecutionId: &core2.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "exec",
},
},
RetryAttempt: 0,
TaskId: &core2.Identifier{
Name: "task1",
},
}, storage.URLPathConstructor{})
assert.NoError(t, err)
assert.Equal(t, "s3://bucket/project/domain/exec/n1/0/task1", p.GetRawOutputPrefix().String())
}