-
Notifications
You must be signed in to change notification settings - Fork 59
Move caching to node executor for fast cache hits #485
Conversation
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
…acheableNode interface Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
…ents Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
…xecution of cached Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
@@ -295,6 +295,18 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu | |||
|
|||
retryAttempt := subNodeStatus.GetAttempts() | |||
|
|||
// fastcache will not emit task events for cache hits. we need to manually detect a | |||
// transition to `SUCCEEDED` and add an `ExternalResourceInfo` for it. | |||
if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(arrayEventRecorder.TaskEvents()) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there one potential problem here, consider this case
Round 1:
node 1 - cache-hit
node 2 - cache-hit
node 3 - system error
Round 2:
node 1 - cache hit
node 2 - cache miss
- event write failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe this will not happen, as when we actually get a system error I think we update the state?
is it possible to simulate this and test this?
One other option is allow admin to move the state from success to running!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the system error on node 3 are is this a Flyte retryable error where we attempt the task again, or a propeller error? In the former case, correct - we persist state and handle this. In the later case, we may get failures in updating ultimately abort the workflow. I think this is how it would work with the current TaskHandler
caching though too right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one major comment, otherwise the code looks so much better.
This makes it actually way more readable. Not sure why the caching stuff was at the task level at all 🗡️
Signed-off-by: Daniel Rammer <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the explanation
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
* fast cache working-ish Signed-off-by: Daniel Rammer <[email protected]> * processing downstream immediately on cache hit Signed-off-by: Daniel Rammer <[email protected]> * moved cache write to node executor Signed-off-by: Daniel Rammer <[email protected]> * working cache and cache serialize Signed-off-by: Daniel Rammer <[email protected]> * starting to clean up Signed-off-by: Daniel Rammer <[email protected]> * removed commented out code Signed-off-by: Daniel Rammer <[email protected]> * removed separate IsCacheable and IsCacheSerializable functions from CacheableNode interface Signed-off-by: Daniel Rammer <[email protected]> * refactored reservation owner id to new function to remove duplication Signed-off-by: Daniel Rammer <[email protected]> * added cache metrics to the node executor Signed-off-by: Daniel Rammer <[email protected]> * cleaned up node cache.go Signed-off-by: Daniel Rammer <[email protected]> * more cleanup Signed-off-by: Daniel Rammer <[email protected]> * setting cache information in phase info so that it is available in events Signed-off-by: Daniel Rammer <[email protected]> * minor refactoring and bug fixes Signed-off-by: Daniel Rammer <[email protected]> * doing an outputs lookup on cache to ensure correctness during failures Signed-off-by: Daniel Rammer <[email protected]> * fix unit tests Signed-off-by: Daniel Rammer <[email protected]> * fixed lint issues Signed-off-by: Daniel Rammer <[email protected]> * moved catalog package to the node level Signed-off-by: Daniel Rammer <[email protected]> * refactored task handler Signed-off-by: Daniel Rammer <[email protected]> * fixed catalog imports on unit testes Signed-off-by: Daniel Rammer <[email protected]> * started cache unit tests Signed-off-by: Daniel Rammer <[email protected]> * added CheckCatalogCache unit tests Signed-off-by: Daniel Rammer <[email protected]> * unit tests for node cache file Signed-off-by: Daniel Rammer <[email protected]> * added node executor cache unit tests Signed-off-by: Daniel Rammer <[email protected]> * fixed cache unit tets Signed-off-by: Daniel Rammer <[email protected]> * fixed lint issues Signed-off-by: Daniel Rammer <[email protected]> * transitioning to 'Succeeded' immediately on cache hit Signed-off-by: Daniel Rammer <[email protected]> * supporting cache overwrite Signed-off-by: Daniel Rammer <[email protected]> * fixed lint issues Signed-off-by: Daniel Rammer <[email protected]> * removed automatic downstream on cache hit Signed-off-by: Daniel Rammer <[email protected]> * bumping boilerplate support tools to go 1.19 to fix generate Signed-off-by: Daniel Rammer <[email protected]> * fixed unit tests and linter Signed-off-by: Daniel Rammer <[email protected]> * removed unnecessary async catalog client from nodeExecutor Signed-off-by: Daniel Rammer <[email protected]> * general refactoring Signed-off-by: Daniel Rammer <[email protected]> * fastcache working with arraynode Signed-off-by: Daniel Rammer <[email protected]> * fixed unit tests - no longer checking for output existance on first execution of cached Signed-off-by: Daniel Rammer <[email protected]> * updating documentation TODOs Signed-off-by: Daniel Rammer <[email protected]> * updated arraynode fastcache to correctly report cache hits Signed-off-by: Daniel Rammer <[email protected]> * remove print statement Signed-off-by: Daniel Rammer <[email protected]> * fixed cache serialize Signed-off-by: Daniel Rammer <[email protected]> * fixed unit tests Signed-off-by: Daniel Rammer <[email protected]> --------- Signed-off-by: Daniel Rammer <[email protected]>
TL;DR
Currently cache lookups require 3 node-level phase changes, which equates to 3 streak rounds of flytepropeller execution. The overhead of updating the k8s FlyteWorkflow CRD after each of these rounds may make it seem like cache lookups are slow. This PR updates cache lookups to happen in the node executor, which allows us to transition nodes directly from NotYetStarted to Success and immediately process downstream nodes in the same FlytePropeller round making cache lookup overhead minimal.
Type
Are all requirements met?
Complete description
In addition to the performance improvements this PR refactors the code to support future implementation of caching other Flyte node types (ie. subworkflow / launchplan and array nodes when they are implemented).
Tracking Issue
fixes flyteorg/flyte#2886
Follow-up issue
NA