-
Notifications
You must be signed in to change notification settings - Fork 669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multi file error aggregation strategy #5795
Add multi file error aggregation strategy #5795
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5795 +/- ##
==========================================
+ Coverage 36.81% 36.85% +0.04%
==========================================
Files 1310 1310
Lines 131034 131217 +183
==========================================
+ Hits 48237 48361 +124
- Misses 78611 78658 +47
- Partials 4186 4198 +12
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
….gedik/improved_remote_file_reader
….gedik/improved_remote_file_reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working on this!
Few points/questions...
- This PR is only one part of the story right? Something still needs to direct all the plugins to write different error messages.
- Can we maybe mark the new proto fields as still a wip? If they don't work out for some reason we can mark those slots as deprecated and at least no one else will try to use them in the meantime. Typically we try to not merge idl changes until things are working end to end, esp for messages that live in the blob store forever, but i think it's low risk in this case.
- How will this pattern be extended into Agent plugins? The new option in the proto will need to be routed through the webapi plugin right?
What frontend UI changes are needed to make the end to end story work if any?
looks good from my end, but would like one of @eapolinario or @pingsutw to also take a look.
flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Outdated
Show resolved
Hide resolved
1/ This PR is self-complete in the sense that it switches the PyTorch plugin to use the earliest timestamp error file strategy. However, the plugin won't be writing multiple files yet, as the environment variables instructing it to do so are not set and the flytekit side changes to populate multiple error files is not implemented. Since the error reading code for earliest timestamp strategy is backward compatible, it works with a single error file as well. My plan was the following in terms of order: Get this PR in, start using the new error retriever for PyTorch
2/ The IDL changes are low risk IMO. I don't mind merging once it works end to end, but need 2 more PRs for that. How do we mark them as WIP? 3/ Re 'How will this pattern be extended into Agent plugins?' I am not familiar with Agent plugins. Any pointers? |
cc flyteorg/flytekit#2797 which is the next step in the process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution! I've a few questions/suggestions below
flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Show resolved
Hide resolved
flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Outdated
Show resolved
Hide resolved
flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Outdated
Show resolved
Hide resolved
flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Outdated
Show resolved
Hide resolved
// Specifies how errors are aggregated | ||
ErrorAggregationStrategy ErrorAggregationStrategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a property of the plugin? Isn't the EarliestError Strategy already backward compatible?
I think if there is anything a plugin needs to optionally configure, it's the pattern of the error file names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is backward compatible, yes. But the RFC called for error strategies, so different ones can be implemented for different plugins. Are you thinking of making it the default?
@fg91 What do you think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me what those other implementations may look like... I usually like to wait for a second/third option before adding an enum/config... If we don't know what implementations we may want to add here, I would say yes let's make it the default and allow plugins to configure the file name pattern...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's break this into 2 revision requests:
-
Do not have an error aggregation strategy enum and make the new implementation, which is backward compatible, the default: I have no objections to this, but then I have the least amount of knowledge about the Flyte ecosystem here. The RFC called out for error aggregation strategies, in anticipation of more plugins making use of this new customization option. Also, not changing the default and using the multi-error file strategy where it is needed (PyTorch plugin for now) seems a bit safer than adopting it for all right away. I need more feedback here from the involved parties. Pinging @fg91 and @eapolinario as they participated in the RFC.
-
Configuring file name patterns: What benefits do you see here? It will further complicate flytekit side error file creation. If there is a motivation, we can do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me what those other implementations may look like... I usually like to wait for a second/third option before adding an enum/config...
In the RFC (not accepted yet so all of this is up for discussion), we propose that the currently existing logic with a single error.pb
file is denoted a strategy as well with enum index 0
(default behaviour). "Earliest" would then be index 1
.
Why is this a property of the plugin? Isn't the EarliestError Strategy already backward compatible?
The RFC currently states this about backwards compatibility:
We propose that the new
MultiErrorFileRemoteFileOutputReader
falls back to reading theerror.pb
(behaviour of the defaultRemoteFileOutputReader
) if noerror-<pod-name>.pb
files are found in order to solve the problem of backwards compatibility:
- If flytekit uses a version that supports multiple error files but the backend does not yet,
pyflyte-execute
will not upload multiple error files for distributed tasks since theFLYTE_INTERNAL_DIST_ERROR_STRATEGY
environment variable will not be set.- If flytekit uses an older version that does not support multiple error files while the backend does, a single error file will be uploaded despite
FLYTE_INTERNAL_DIST_ERROR_STRATEGY
being set. The output reader will, however, fall back to reading the singleerror.pb
.
I think plugins need to configure in their properties whether they make use of the default single error.pb
strategy (0
in the enum) or the "earliest of multiple" strategy (1
in the enum) despite this backwards compatibility because most tasks will need the default strategy. Always first checking if multiple errors files exist to then fall back to the single one will make a lot of unnecessary requests to blob storage since for certain plugins (in particular normal python task!) we already know that only a single file will be written.
I'd only use this fallback in plugins that use multiple error files to account for version mismatches between flytekit and backend.
I think if there is anything a plugin needs to optionally configure, it's the pattern of the error file names
In an earlier version of the RFC we included configurable file name patterns but for simplicity decided to propose to add an /errors
"directory" (being fully aware that buckets are not a filesystem) to the raw output prefix bucket uri. Any files in this error bucket would be considered potential error files.
Please let me know if you have any questions or disagree about this @EngHabu. As said above, the RFC is still open and I'm happy to discuss and change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always first checking if multiple errors files exist to then fall back to the single one will make a lot of unnecessary requests to blob storage since for certain plugins (in particular normal python task!) we already know that only a single file will be written.
The overhead will be a single additional list call, which will return the single error.pb file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a single additional call. But pytorch (or other distributed tasks) generally are not the majority of tasks and for a normal python task this is never required. I'd argue we should avoid this unnecessary call if we already know it's void.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am in agreement.
if container.Env == nil { | ||
container.Env = make([]apiv1.EnvVar, 0, 2) | ||
} | ||
container.Env = append(container.Env, apiv1.EnvVar{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a previous PR you added a pod name env var, see here.
I feel like only one of the two should be injected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove the pod one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do that in a different PR, as it requires more approvals for files involving Spark tests, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to tag me as reviewer in the PR.
flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
Outdated
Show resolved
Hide resolved
…bgedik/flyte into bugra.gedik/improved_remote_file_reader
"github.com/flyteorg/flyte/flytestdlib/storage" | ||
) | ||
|
||
type RemoteFileOutputReader struct { | ||
outPath io.OutputFilePaths | ||
type errorRetriever interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RemoteFileOutputReader
already implements the OutputReader
interface, see here.
I would prefer if this could be refactored in a way that doesn't require introducing another interface for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsError
of OutputReader
also seems to serve the same purpose as HasError
, ReadError
of OutputReader
the same purpose as GetError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fg91 RemoteFileOutputReader
still only implements OutputReader
and not the errorRetriever
.
An output reader has 2 functionalities: output reading and error reading. I refactored RemoteFileOutputReader
so that it has an error retriever member, which can be either singleFileErrorRetriever
or earliestFileErrorRetriever
. Both singleFileErrorRetriever
and earliestFileErrorRetriever
implement the errorRetriever
interface. It is the standard strategy pattern.
It looks like you are thinking of having a MultiFileRemoteFileOutputReader
that extends from RemoteFileOutputReader
. I don't quite like that (and also commented on the RFC about this sometime back), as we are creating a new output reader variety for each error reading strategy, instead of creating new error reader varieties only.
Here is what I can do to avoid defining a new interface: I'll refactor OutputReader
interface to inherit from an ErrorReader
interface, so we won't introduce something new and use ErrorReader
interface for the errorRetriever
member (may be call it errorReader
).
Let me do this and if you are still not comfortable with it, we can perhaps chat about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 0a12e95
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah agree with that, no to MultiFileRemoteFileOutputReader i think. just the error side is enough.
Tracking issue
Part of RFC #5598
Why are the changes needed?
Deterministic error propagation, see: #5598
What changes were proposed in this pull request?
An error aggregation strategy was introduced for the remove file output reader.
This PR is self-complete in the sense that it switches the PyTorch plugin to use the earliest timestamp error file strategy. However, the plugin won't be writing multiple files yet, as the flytekit side changes to populate multiple error files is not implemented. Since the error reading code for earliest timestamp strategy is backward compatible, it works with a single error file as well.
My plan was the following in terms of order:
How was this patch tested?
Unit testing.
Check all the applicable boxes
Related PRs