Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
RawDataOutput directory for every task execution (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Mar 25, 2020
1 parent 3050380 commit 000cec8
Show file tree
Hide file tree
Showing 23 changed files with 491 additions and 41 deletions.
15 changes: 14 additions & 1 deletion go/tasks/pluginmachinery/io/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,22 @@ type OutputReader interface {
Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error)
}

// All paths where various outputs produced by the task can be placed, such that the framework can directly access them.
// RawOutputPaths is the actual path where the data produced by a task can be placed. It is completely optional. The advantage
// of using this path is to provide exactly once semantics. It is guaranteed that this path is unique for every new execution
// of a task (across retries etc) and is constant for a specific execution.
// As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to
// FlytePluginMachinery so that it can be used more universally outside of Flytekit.
type RawOutputPaths interface {
// This is prefix (blob store prefix or directory) where all data produced can be stored.
GetRawOutputPrefix() storage.DataReference
}

// All paths where various meta outputs produced by the task can be placed, such that the framework can directly access them.
// All paths are reperesented using storage.DataReference -> an URN for the configured storage backend
type OutputFilePaths interface {
// RawOutputPaths are available with OutputFilePaths
RawOutputPaths

// A path to a directory or prefix that contains all execution metadata for this execution
GetOutputPrefixPath() storage.DataReference
// A fully qualified path (URN) to where the framework expects the output to exist in the configured storage backend
Expand Down
45 changes: 45 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_data_sandbox.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_file_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/raw_output_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/tasks/pluginmachinery/ioutils/data_sharder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ioutils

import "context"

// This interface allows shard selection for OutputSandbox.
type ShardSelector interface {
GetShardPrefix(ctx context.Context, s []byte) (string, error)
}
73 changes: 73 additions & 0 deletions go/tasks/pluginmachinery/ioutils/precomputed_shardselector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ioutils

import (
"context"
"hash/fnv"
"strings"

"github.com/pkg/errors"
)

// Generates the entire latin alphabet and appends it to the passed in array and returns the new array
func GenerateAlphabet(b []rune) []rune {
for i := 'a'; i <= 'z'; i++ {
b = append(b, i)
}
return b
}

// Generates all arabic numerals and appends to the passed in array and returns the new array/slice
func GenerateArabicNumerals(b []rune) []rune {
for i := '0'; i <= '9'; i++ {
b = append(b, i)
}
return b
}

func createAlphabetAndNumerals() []rune {
b := make([]rune, 0, 36)
b = GenerateAlphabet(b)
return GenerateArabicNumerals(b)
}

// this sharder distributes data into one of the precomputed buckets. The bucket is deterministically determined given the input s
type PrecomputedShardSelector struct {
precomputedPrefixes []string
buckets uint32
}

// Generates deterministic shard id for the given string s
func (d *PrecomputedShardSelector) GetShardPrefix(_ context.Context, s []byte) (string, error) {
h := fnv.New32a()
_, err := h.Write(s)
if err != nil {
return "", errors.Wrap(err, "failed to create shard prefix, reason hash failure.")
}
idx := h.Sum32() % d.buckets
return d.precomputedPrefixes[idx], nil
}

// Creates a PrecomputedShardSelector with 36*36 unique shards. Each shard is of the format {[0-9a-z][0-9a-z]}, i.e. 2 character long.
func NewBase36PrefixShardSelector(ctx context.Context) (ShardSelector, error) {
permittedChars := createAlphabetAndNumerals()
n := len(permittedChars)
precomputedPrefixes := make([]string, 0, n*n)
for _, c1 := range permittedChars {
for _, c2 := range permittedChars {
sb := strings.Builder{}
sb.WriteRune(c1)
sb.WriteRune(c2)
precomputedPrefixes = append(precomputedPrefixes, sb.String())
}
}

return NewConstantShardSelector(precomputedPrefixes), nil
}

// uses the given shards to select a shard
func NewConstantShardSelector(shards []string) ShardSelector {
return &PrecomputedShardSelector{
precomputedPrefixes: shards,
buckets: uint32(len(shards)),
}
}
61 changes: 61 additions & 0 deletions go/tasks/pluginmachinery/ioutils/precomputed_shardselector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ioutils

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPrecomputedShardSelector_GetShardPrefix(t *testing.T) {
ctx := context.TODO()
t.Run("single-shard", func(t *testing.T) {
ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x"}, buckets: 1}
p, err := ss.GetShardPrefix(ctx, []byte("abc"))
assert.NoError(t, err)
assert.Equal(t, "x", p)
})

t.Run("two-shards", func(t *testing.T) {
ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x", "y"}, buckets: 2}
p, err := ss.GetShardPrefix(ctx, []byte("abc"))
assert.NoError(t, err)
assert.Equal(t, "y", p)
p, err = ss.GetShardPrefix(ctx, []byte("xyz"))
assert.NoError(t, err)
assert.Equal(t, "x", p)
})
}

func TestGenerateAlphabet(t *testing.T) {
var b []rune
b = GenerateAlphabet(b)

assert.Equal(t, 26, len(b))
assert.Equal(t, 'a', b[0])
assert.Equal(t, 'z', b[25])

// Additive
b = GenerateAlphabet(b)

assert.Equal(t, 52, len(b))
assert.Equal(t, 'a', b[26])
assert.Equal(t, 'z', b[51])
}

func TestGenerateArabicNumerals(t *testing.T) {
var b []rune
b = GenerateArabicNumerals(b)

assert.Equal(t, 10, len(b))
assert.Equal(t, '0', b[0])
assert.Equal(t, '9', b[9])

// Additive
b = GenerateArabicNumerals(b)
assert.Equal(t, 20, len(b))
assert.Equal(t, '0', b[0])
assert.Equal(t, '9', b[9])
assert.Equal(t, '0', b[10])
assert.Equal(t, '9', b[19])
}
Loading

0 comments on commit 000cec8

Please sign in to comment.