-
Notifications
You must be signed in to change notification settings - Fork 53
RawDataOutput directory for every task execution #67
Conversation
go/tasks/pluginmachinery/io/iface.go
Outdated
// FlytePluginMachinery so that it can be used more universally outside of Flytekit. | ||
type OutputDataSandbox interface { | ||
// This is prefix (blob store prefix or directory) where all data produced can be stored. | ||
GetOutputDataSandboxPath() storage.DataReference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is sandbox the best name? That implies something to do with the sandbox deployment i feel like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm true, it is really a sandbox. Do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the name sandbox for this, but lets really think of a good alternative, then i can change. Otherwise lets stick with this. So i called it DataOutputSandbox
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtRestStorage, UserStorageLocation, FlyteHardDrive, FlyteBlackBox, FlyteRecorder, UserOutputLocation, UserOutputStorage, FlyteDurableStore, etc. I don't really care that much, but i find some Flyte things already unnecessarily confusing and would just rather err on the side of less confusing.
go/tasks/pluginmachinery/io/iface.go
Outdated
// FlytePluginMachinery so that it can be used more universally outside of Flytekit. | ||
type OutputDataSandbox interface { | ||
// This is prefix (blob store prefix or directory) where all data produced can be stored. | ||
GetOutputDataSandboxPath() storage.DataReference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetOutputDataSandboxPath() storage.DataReference | |
GetOutputDataSandboxPrefix() storage.DataReference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the name sandbox to something else. Let me think
go/tasks/pluginmachinery/io/iface.go
Outdated
@@ -51,6 +61,8 @@ type OutputFilePaths interface { | |||
// A Fully qualified path (URN) where the error information should be placed as a protobuf core.ErrorDocument. It is not directly | |||
// used by the framework, but could be used in the future | |||
GetErrorPath() storage.DataReference | |||
|
|||
OutputDataSandbox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you move it to the top of the interface... I find it a bit more readable..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
go/tasks/pluginmachinery/io/iface.go
Outdated
// of a task (across retries etc) and is constant for a specific execution. | ||
// As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to | ||
// FlytePluginMachinery so that it can be used more universally outside of Flytekit. | ||
type OutputDataSandbox interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need an interface here? I feel like we should just move the method into OutputFilePaths interface... that or we keep it as a separate interface but do not include it in the other interface, include it directly in the implementation struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
// Determinism depends on the outputMetadataPath | ||
// Potential performance problem, as creating anew randomprefixShardedOutput Sandbox may be expensive as it hashes the outputMetadataPath | ||
// the final OutputSandbox is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata | ||
func NewRandomPrefixShardedOutputSandbox(ctx context.Context, sharder ShardSelector, basePath, outputMetadataPath storage.DataReference, store storage.ReferenceConstructor) (io.OutputDataSandbox, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not random prefix, right? this depends on the ShardSelector. This guy is pretty deterministic otherwise... right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is very deterministic by design, i should rename this constructor. Let me think about the names
go/tasks/pluginmachinery/io/iface.go
Outdated
// of a task (across retries etc) and is constant for a specific execution. | ||
// As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to | ||
// FlytePluginMachinery so that it can be used more universally outside of Flytekit. | ||
type OutputDataSandbox interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
} | ||
|
||
// uses the given shards to select a shard | ||
func NewConstantShardSelector(shards []string) ShardSelector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still prefer to accept interfaces and return specific types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you cannot, the linter wont let you for non exported types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But PrecomputedShardSelector is exported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point i will unexport it
index int) (io.OutputReader, error) { | ||
dataReference, err := dataStore.ConstructReference(ctx, outputPrefix, strconv.Itoa(index)) | ||
strIndex := strconv.Itoa(index) | ||
dataReference, err := dataStore.ConstructReference(ctx, outputPrefix, strIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't follow. Why are there two data references now? there's the dataReference
that's in the original code, and the new outputSandbox
- what's the difference between the two and why do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the first reference is for where the metadata is stored -> OutputPath (should have been OutputMetadata). this where essentially where outputs.pb, futures.pb etc
the new one is what is today generated by flytekit, location where data is stored for the execution from the container. This is never read by flytepropeller, but it is better to generate it from propeller
@@ -71,10 +72,11 @@ func (w RemoteFileOutputWriter) Put(ctx context.Context, reader io.OutputReader) | |||
return fmt.Errorf("no data found to write") | |||
} | |||
|
|||
func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference) RemoteFileOutputPaths { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that now there's a new datareference as part of io.RawOutputPaths
would it help to add some more functions to RemoteFileOutputReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, we never want to read that data. Now that we call RawOutput that should be more clear.
} | ||
|
||
// uses the given shards to select a shard | ||
func NewConstantShardSelector(shards []string) ShardSelector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But PrecomputedShardSelector is exported
Co-Authored-By: Haytham AbuelFutuh <[email protected]>
Co-Authored-By: Haytham AbuelFutuh <[email protected]>
…ns into adding-dataoutput-prefix
TL;DR
This PR enables FlytePropeller to create data sandboxes for the executors (containers or plugins) to use as data storage.
Type
Are all requirements met?
Complete description
Currently Flyte allows exactly once processing semantics for processes that use flytekit data handling and do not produce any side effects. This is possible because flytekit creates a random data sandbox directory (in the blobstore) which is commited (i.e., commit here implies that a user can perceive a task to be complete only when the task completion is successfully recorded by flytepropeller, this is just a trick, concurrent executions are thus disregarded only the first one to return wins/commits).
This commit promotes this sandbox creation from flytekit into flytepropeller. this enables plugins that do not use flytekit or other sdks can simply rely on the {{ .outputs.sandbox }} directory parameter that is passed as an input. Every retry gets a new random sandbox. This also enables to mark some sandboxes are active and this making is possible to perform selective garbage collection.
Tracking Issue
flyteorg/flyte#195
Follow-up issue
flyteorg/flyte#211 - Per workflow data sandboxes