Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
feat; allow users choose raw output path style (sharded, non sharded,…
Browse files Browse the repository at this point in the history
… human readable etc) (#116)
  • Loading branch information
kumare3 authored Aug 27, 2020
1 parent 3ebab24 commit af40d96
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
35 changes: 33 additions & 2 deletions go/tasks/pluginmachinery/ioutils/raw_output_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/sha1" // #nosec
"encoding/hex"
"strconv"

core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down Expand Up @@ -44,8 +46,8 @@ func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSele
}

// A simple Output sandbox at a given path
func NewRawOutputPaths(_ context.Context, outputSandboxPath storage.DataReference) io.RawOutputPaths {
return precomputedRawOutputPaths{path: outputSandboxPath}
func NewRawOutputPaths(_ context.Context, rawOutputPrefix storage.DataReference) io.RawOutputPaths {
return precomputedRawOutputPaths{path: rawOutputPrefix}
}

// Creates an OutputSandbox in the basePath using the uniqueID and a sharder
Expand All @@ -64,3 +66,32 @@ func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePat
path: path,
}, nil
}

// Constructs an output path that is deterministic and unique within the given outputPrefix. No sharding is performed
func NewDeterministicUniqueRawOutputPath(ctx context.Context, rawOutputPrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error) {
o := []byte(outputMetadataPrefix)
/* #nosec */
// We use SHA1 for sheer speed instead of no collisions. As because of the shard Prefix + hash is pretty unique :)
m := sha1.New()
if _, err := m.Write(o); err != nil {
return nil, err
}
path, err := store.ConstructReference(ctx, rawOutputPrefix, hex.EncodeToString(m.Sum(nil)))
if err != nil {
return nil, err
}
return precomputedRawOutputPaths{
path: path,
}, nil
}

// Generates a RawOutput Path that looks like the TaskExecutionID and can be easily cross referenced with Flyte generated TaskExecution ID
func NewTaskIDRawOutputPath(ctx context.Context, rawOutputPrefix storage.DataReference, taskID *core2.TaskExecutionIdentifier, store storage.ReferenceConstructor) (io.RawOutputPaths, error) {
path, err := store.ConstructReference(ctx, rawOutputPrefix, taskID.GetNodeExecutionId().GetExecutionId().GetProject(), taskID.GetNodeExecutionId().GetExecutionId().GetDomain(), taskID.GetNodeExecutionId().GetExecutionId().GetName(), taskID.GetNodeExecutionId().GetNodeId(), strconv.Itoa(int(taskID.GetRetryAttempt())), taskID.GetTaskId().GetName())
if err != nil {
return nil, err
}
return precomputedRawOutputPaths{
path: path,
}, nil
}
40 changes: 38 additions & 2 deletions go/tasks/pluginmachinery/ioutils/raw_output_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

core2 "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
)
Expand All @@ -12,7 +13,7 @@ func TestNewOutputSandbox(t *testing.T) {
assert.Equal(t, NewRawOutputPaths(context.TODO(), "x").GetRawOutputPrefix(), storage.DataReference("x"))
}

func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) {
func TestNewShardedDeterministicRawOutputPath(t *testing.T) {
ctx := context.TODO()

t.Run("success-path", func(t *testing.T) {
Expand All @@ -29,7 +30,7 @@ func TestNewRandomPrefixShardedOutputSandbox(t *testing.T) {
})
}

func TestNewShardedOutputSandbox(t *testing.T) {
func TestNewShardedRawOutputPath(t *testing.T) {
ctx := context.TODO()
t.Run("", func(t *testing.T) {
ss := NewConstantShardSelector([]string{"x"})
Expand All @@ -44,3 +45,38 @@ func TestNewShardedOutputSandbox(t *testing.T) {
assert.Error(t, err, "%s", sd)
})
}

func TestNewDeterministicUniqueRawOutputPath(t *testing.T) {
ctx := context.TODO()

t.Run("success-path", func(t *testing.T) {
sd, err := NewDeterministicUniqueRawOutputPath(ctx, "s3://bucket", "m", storage.URLPathConstructor{})
assert.NoError(t, err)
assert.Equal(t, storage.DataReference("s3://bucket/6b0d31c0d563223024da45691584643ac78c96e8"), sd.GetRawOutputPrefix())
})

t.Run("error-not-possible", func(t *testing.T) {
sd, err := NewDeterministicUniqueRawOutputPath(ctx, "bucket", "m", storage.URLPathConstructor{})
assert.NoError(t, err)
assert.Equal(t, "/bucket/6b0d31c0d563223024da45691584643ac78c96e8", sd.GetRawOutputPrefix().String())
})
}

func TestNewTaskIDRawOutputPath(t *testing.T) {
p, err := NewTaskIDRawOutputPath(context.TODO(), "s3://bucket", &core2.TaskExecutionIdentifier{
NodeExecutionId: &core2.NodeExecutionIdentifier{
NodeId: "n1",
ExecutionId: &core2.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "exec",
},
},
RetryAttempt: 0,
TaskId: &core2.Identifier{
Name: "task1",
},
}, storage.URLPathConstructor{})
assert.NoError(t, err)
assert.Equal(t, "s3://bucket/project/domain/exec/n1/0/task1", p.GetRawOutputPrefix().String())
}

0 comments on commit af40d96

Please sign in to comment.