Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

RawDataOutput directory for every task execution #67

Merged
merged 21 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion go/tasks/pluginmachinery/io/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,22 @@ type OutputReader interface {
Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error)
}

// All paths where various outputs produced by the task can be placed, such that the framework can directly access them.
// RawOutputPaths is the actual path where the data produced by a task can be placed. It is completely optional. The advantage
// of using this path is to provide exactly once semantics. It is guaranteed that this path is unique for every new execution
// of a task (across retries etc) and is constant for a specific execution.
// As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to
// FlytePluginMachinery so that it can be used more universally outside of Flytekit.
type RawOutputPaths interface {
// This is prefix (blob store prefix or directory) where all data produced can be stored.
GetRawOutputPrefix() storage.DataReference
}

// All paths where various meta outputs produced by the task can be placed, such that the framework can directly access them.
// All paths are reperesented using storage.DataReference -> an URN for the configured storage backend
type OutputFilePaths interface {
// RawOutputPaths are available with OutputFilePaths
RawOutputPaths

// A path to a directory or prefix that contains all execution metadata for this execution
GetOutputPrefixPath() storage.DataReference
// A fully qualified path (URN) to where the framework expects the output to exist in the configured storage backend
Expand Down
45 changes: 45 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_file_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/raw_output_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/tasks/pluginmachinery/ioutils/data_sharder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ioutils

import "context"

// This interface allows shard selection for OutputSandbox.
type ShardSelector interface {
GetShardPrefix(ctx context.Context, s []byte) (string, error)
}
73 changes: 73 additions & 0 deletions go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ioutils

import (
"context"
"hash/fnv"
"strings"

"github.com/pkg/errors"
)

// Generates the entire latin alphabet and appends it to the passed in array and returns the new array
func GenerateAlphabet(b []rune) []rune {
for i := 'a'; i <= 'z'; i++ {
b = append(b, i)
}
return b
}

// Generates all arabic numerals and appends to the passed in array and returns the new array/slice
func GenerateArabicNumerals(b []rune) []rune {
for i := '0'; i <= '9'; i++ {
b = append(b, i)
}
return b
}

func createAlphabetAndNumerals() []rune {
b := make([]rune, 0, 36)
b = GenerateAlphabet(b)
return GenerateArabicNumerals(b)
}

// this sharder distributes data into one of the precomputed buckets. The bucket is deterministically determined given the input s
type PrecomputedShardSelector struct {
precomputedPrefixes []string
buckets uint32
}

// Generates deterministic shard id for the given string s
func (d *PrecomputedShardSelector) GetShardPrefix(_ context.Context, s []byte) (string, error) {
h := fnv.New32a()
_, err := h.Write(s)
if err != nil {
return "", errors.Wrap(err, "failed to create shard prefix, reason hash failure.")
}
idx := h.Sum32() % d.buckets
return d.precomputedPrefixes[idx], nil
}

// Creates a PrecomputedShardSelector with 36*36 unique shards. Each shard is of the format {[0-9a-z][0-9a-z]}, i.e. 2 character long.
func NewBase36PrefixShardSelector(ctx context.Context) (ShardSelector, error) {
permittedChars := createAlphabetAndNumerals()
n := len(permittedChars)
precomputedPrefixes := make([]string, 0, n*n)
for _, c1 := range permittedChars {
for _, c2 := range permittedChars {
sb := strings.Builder{}
sb.WriteRune(c1)
sb.WriteRune(c2)
precomputedPrefixes = append(precomputedPrefixes, sb.String())
}
}

return NewConstantShardSelector(precomputedPrefixes), nil
}

// uses the given shards to select a shard
func NewConstantShardSelector(shards []string) ShardSelector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still prefer to accept interfaces and return specific types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot, the linter wont let you for non exported types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But PrecomputedShardSelector is exported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point i will unexport it

return &PrecomputedShardSelector{
precomputedPrefixes: shards,
buckets: uint32(len(shards)),
}
}
61 changes: 61 additions & 0 deletions go/tasks/pluginmachinery/ioutils/precomputed_shardselector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ioutils

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPrecomputedShardSelector_GetShardPrefix(t *testing.T) {
ctx := context.TODO()
t.Run("single-shard", func(t *testing.T) {
ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x"}, buckets: 1}
p, err := ss.GetShardPrefix(ctx, []byte("abc"))
assert.NoError(t, err)
assert.Equal(t, "x", p)
})

t.Run("two-shards", func(t *testing.T) {
ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x", "y"}, buckets: 2}
p, err := ss.GetShardPrefix(ctx, []byte("abc"))
assert.NoError(t, err)
assert.Equal(t, "y", p)
p, err = ss.GetShardPrefix(ctx, []byte("xyz"))
assert.NoError(t, err)
assert.Equal(t, "x", p)
})
}

func TestGenerateAlphabet(t *testing.T) {
var b []rune
b = GenerateAlphabet(b)

assert.Equal(t, 26, len(b))
assert.Equal(t, 'a', b[0])
assert.Equal(t, 'z', b[25])

// Additive
b = GenerateAlphabet(b)

assert.Equal(t, 52, len(b))
assert.Equal(t, 'a', b[26])
assert.Equal(t, 'z', b[51])
}

func TestGenerateArabicNumerals(t *testing.T) {
var b []rune
b = GenerateArabicNumerals(b)

assert.Equal(t, 10, len(b))
assert.Equal(t, '0', b[0])
assert.Equal(t, '9', b[9])

// Additive
b = GenerateArabicNumerals(b)
assert.Equal(t, 20, len(b))
assert.Equal(t, '0', b[0])
assert.Equal(t, '9', b[9])
assert.Equal(t, '0', b[10])
assert.Equal(t, '9', b[19])
}
Loading