This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
Move caching to node executor for fast cache hits #485
Merged
Merged
Changes from all commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
817ae4b
fast cache working-ish
hamersaw 16d3fbc
processing downstream immediately on cache hit
hamersaw 43f40e7
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw 08eda31
moved cache write to node executor
hamersaw ac773ae
working cache and cache serialize
hamersaw c3464da
starting to clean up
hamersaw 3f0b705
removed commented out code
hamersaw fcb9174
removed separate IsCacheable and IsCacheSerializable functions from C…
hamersaw 46b525e
refactored reservation owner id to new function to remove duplication
hamersaw 82a76c5
added cache metrics to the node executor
hamersaw 1a33a4d
cleaned up node cache.go
hamersaw 783858a
more cleanup
hamersaw 43c6827
setting cache information in phase info so that it is available in ev…
hamersaw 6310902
minor refactoring and bug fixes
hamersaw 502d749
doing an outputs lookup on cache to ensure correctness during failures
hamersaw fe2d056
fix unit tests
hamersaw a3e1581
fixed lint issues
hamersaw 8a49895
moved catalog package to the node level
hamersaw 6471ce8
refactored task handler
hamersaw a0e4619
fixed catalog imports on unit testes
hamersaw 439af82
started cache unit tests
hamersaw 02e314c
added CheckCatalogCache unit tests
hamersaw 4b8c241
unit tests for node cache file
hamersaw 757d3b3
added node executor cache unit tests
hamersaw 3dc1f14
fixed cache unit tets
hamersaw 4093692
fixed lint issues
hamersaw 270396a
transitioning to 'Succeeded' immediately on cache hit
hamersaw aa67293
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw 0b65120
supporting cache overwrite
hamersaw cd72a0e
fixed lint issues
hamersaw dcaefdc
removed automatic downstream on cache hit
hamersaw ccd52fc
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw b940d00
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw 5ea74d8
bumping boilerplate support tools to go 1.19 to fix generate
hamersaw cabfc58
fixed unit tests and linter
hamersaw 95bf8e1
removed unnecessary async catalog client from nodeExecutor
hamersaw 5d019bf
general refactoring
hamersaw e0f6715
fastcache working with arraynode
hamersaw f3ca83c
fixed unit tests - no longer checking for output existance on first e…
hamersaw 15c46af
updating documentation TODOs
hamersaw fa721e6
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw 9f8b7d9
updated arraynode fastcache to correctly report cache hits
hamersaw 4347b6d
remove print statement
hamersaw 9245f3e
fixed cache serialize
hamersaw c672b07
fixed unit tests
hamersaw 5c7c3c7
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
package nodes | ||
|
||
import ( | ||
"context" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" | ||
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" | ||
|
||
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" | ||
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/encoding" | ||
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" | ||
|
||
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" | ||
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" | ||
nodeserrors "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors" | ||
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" | ||
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces" | ||
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task" | ||
|
||
"github.com/flyteorg/flytestdlib/logger" | ||
"github.com/flyteorg/flytestdlib/storage" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
// computeCatalogReservationOwnerID constructs a unique identifier which includes the nodes | ||
// parent information, node ID, and retry attempt number. This is used to uniquely identify a task | ||
// when the cache reservation API to serialize cached executions. | ||
func computeCatalogReservationOwnerID(nCtx interfaces.NodeExecutionContext) (string, error) { | ||
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID()) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
ownerID, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, | ||
[]string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(nCtx.CurrentAttempt()))}) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
return ownerID, nil | ||
} | ||
|
||
// updatePhaseCacheInfo adds the cache and catalog reservation metadata to the PhaseInfo. This | ||
// ensures this information is reported in events and available within FlyteAdmin. | ||
func updatePhaseCacheInfo(phaseInfo handler.PhaseInfo, cacheStatus *catalog.Status, reservationStatus *core.CatalogReservation_Status) handler.PhaseInfo { | ||
if cacheStatus == nil && reservationStatus == nil { | ||
return phaseInfo | ||
} | ||
|
||
info := phaseInfo.GetInfo() | ||
if info == nil { | ||
info = &handler.ExecutionInfo{} | ||
} | ||
|
||
if info.TaskNodeInfo == nil { | ||
info.TaskNodeInfo = &handler.TaskNodeInfo{} | ||
} | ||
|
||
if info.TaskNodeInfo.TaskNodeMetadata == nil { | ||
info.TaskNodeInfo.TaskNodeMetadata = &event.TaskNodeMetadata{} | ||
} | ||
|
||
if cacheStatus != nil { | ||
info.TaskNodeInfo.TaskNodeMetadata.CacheStatus = cacheStatus.GetCacheStatus() | ||
info.TaskNodeInfo.TaskNodeMetadata.CatalogKey = cacheStatus.GetMetadata() | ||
} | ||
|
||
if reservationStatus != nil { | ||
info.TaskNodeInfo.TaskNodeMetadata.ReservationStatus = *reservationStatus | ||
} | ||
|
||
return phaseInfo.WithInfo(info) | ||
} | ||
|
||
// CheckCatalogCache uses the handler and contexts to check if cached outputs for the current node | ||
// exist. If the exist, this function also copies the outputs to this node. | ||
func (n *nodeExecutor) CheckCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Entry, error) { | ||
catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) | ||
if err != nil { | ||
return catalog.Entry{}, errors.Wrapf(err, "failed to initialize the catalogKey") | ||
} | ||
|
||
entry, err := n.catalog.Get(ctx, catalogKey) | ||
if err != nil { | ||
causeErr := errors.Cause(err) | ||
if taskStatus, ok := status.FromError(causeErr); ok && taskStatus.Code() == codes.NotFound { | ||
n.metrics.catalogMissCount.Inc(ctx) | ||
logger.Infof(ctx, "Catalog CacheMiss: Artifact not found in Catalog. Executing Task.") | ||
return catalog.NewCatalogEntry(nil, catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, nil)), nil | ||
} | ||
|
||
n.metrics.catalogGetFailureCount.Inc(ctx) | ||
logger.Errorf(ctx, "Catalog Failure: memoization check failed. err: %v", err.Error()) | ||
return catalog.Entry{}, errors.Wrapf(err, "Failed to check Catalog for previous results") | ||
} | ||
|
||
if entry.GetStatus().GetCacheStatus() != core.CatalogCacheStatus_CACHE_HIT { | ||
logger.Errorf(ctx, "No CacheHIT and no Error received. Illegal state, Cache State: %s", entry.GetStatus().GetCacheStatus().String()) | ||
// TODO should this be an error? | ||
return entry, nil | ||
} | ||
|
||
logger.Infof(ctx, "Catalog CacheHit: for task [%s/%s/%s/%s]", catalogKey.Identifier.Project, | ||
catalogKey.Identifier.Domain, catalogKey.Identifier.Name, catalogKey.Identifier.Version) | ||
n.metrics.catalogHitCount.Inc(ctx) | ||
|
||
iface := catalogKey.TypedInterface | ||
if iface.Outputs != nil && len(iface.Outputs.Variables) > 0 { | ||
// copy cached outputs to node outputs | ||
o, ee, err := entry.GetOutputs().Read(ctx) | ||
if err != nil { | ||
logger.Errorf(ctx, "failed to read from catalog, err: %s", err.Error()) | ||
return catalog.Entry{}, err | ||
} else if ee != nil { | ||
logger.Errorf(ctx, "got execution error from catalog output reader? This should not happen, err: %s", ee.String()) | ||
return catalog.Entry{}, nodeserrors.Errorf(nodeserrors.IllegalStateError, nCtx.NodeID(), "execution error from a cache output, bad state: %s", ee.String()) | ||
} | ||
|
||
outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir()) | ||
if err := nCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, o); err != nil { | ||
logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error()) | ||
return catalog.Entry{}, err | ||
} | ||
} | ||
|
||
return entry, nil | ||
} | ||
|
||
// GetOrExtendCatalogReservation attempts to acquire an artifact reservation if the task is | ||
// cachable and cache serializable. If the reservation already exists for this owner, the | ||
// reservation is extended. | ||
func (n *nodeExecutor) GetOrExtendCatalogReservation(ctx context.Context, nCtx interfaces.NodeExecutionContext, | ||
cacheHandler interfaces.CacheableNodeHandler, heartbeatInterval time.Duration) (catalog.ReservationEntry, error) { | ||
|
||
catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) | ||
if err != nil { | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED), | ||
errors.Wrapf(err, "failed to initialize the catalogKey") | ||
} | ||
|
||
ownerID, err := computeCatalogReservationOwnerID(nCtx) | ||
if err != nil { | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED), | ||
errors.Wrapf(err, "failed to initialize the cache reservation ownerID") | ||
} | ||
|
||
reservation, err := n.catalog.GetOrExtendReservation(ctx, catalogKey, ownerID, heartbeatInterval) | ||
if err != nil { | ||
n.metrics.reservationGetFailureCount.Inc(ctx) | ||
logger.Errorf(ctx, "Catalog Failure: reservation get or extend failed. err: %v", err.Error()) | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_FAILURE), err | ||
} | ||
|
||
var status core.CatalogReservation_Status | ||
if reservation.OwnerId == ownerID { | ||
status = core.CatalogReservation_RESERVATION_ACQUIRED | ||
} else { | ||
status = core.CatalogReservation_RESERVATION_EXISTS | ||
} | ||
|
||
n.metrics.reservationGetSuccessCount.Inc(ctx) | ||
return catalog.NewReservationEntry(reservation.ExpiresAt.AsTime(), | ||
reservation.HeartbeatInterval.AsDuration(), reservation.OwnerId, status), nil | ||
} | ||
|
||
// ReleaseCatalogReservation attempts to release an artifact reservation if the task is cachable | ||
// and cache serializable. If the reservation does not exist for this owner (e.x. it never existed | ||
// or has been acquired by another owner) this call is still successful. | ||
func (n *nodeExecutor) ReleaseCatalogReservation(ctx context.Context, nCtx interfaces.NodeExecutionContext, | ||
cacheHandler interfaces.CacheableNodeHandler) (catalog.ReservationEntry, error) { | ||
|
||
catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) | ||
if err != nil { | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED), | ||
errors.Wrapf(err, "failed to initialize the catalogKey") | ||
} | ||
|
||
ownerID, err := computeCatalogReservationOwnerID(nCtx) | ||
if err != nil { | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED), | ||
errors.Wrapf(err, "failed to initialize the cache reservation ownerID") | ||
} | ||
|
||
err = n.catalog.ReleaseReservation(ctx, catalogKey, ownerID) | ||
if err != nil { | ||
n.metrics.reservationReleaseFailureCount.Inc(ctx) | ||
logger.Errorf(ctx, "Catalog Failure: release reservation failed. err: %v", err.Error()) | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_FAILURE), err | ||
} | ||
|
||
n.metrics.reservationReleaseSuccessCount.Inc(ctx) | ||
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_RELEASED), nil | ||
} | ||
|
||
// WriteCatalogCache relays the outputs of this node to the cache. This allows future executions | ||
// to reuse these data to avoid recomputation. | ||
func (n *nodeExecutor) WriteCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Status, error) { | ||
catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) | ||
if err != nil { | ||
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), errors.Wrapf(err, "failed to initialize the catalogKey") | ||
} | ||
|
||
iface := catalogKey.TypedInterface | ||
if iface.Outputs != nil && len(iface.Outputs.Variables) == 0 { | ||
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), nil | ||
} | ||
|
||
logger.Infof(ctx, "Catalog CacheEnabled. recording execution [%s/%s/%s/%s]", catalogKey.Identifier.Project, | ||
catalogKey.Identifier.Domain, catalogKey.Identifier.Name, catalogKey.Identifier.Version) | ||
|
||
outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir()) | ||
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) | ||
metadata := catalog.Metadata{ | ||
TaskExecutionIdentifier: task.GetTaskExecutionIdentifier(nCtx), | ||
} | ||
|
||
// ignores discovery write failures | ||
status, err := n.catalog.Put(ctx, catalogKey, outputReader, metadata) | ||
if err != nil { | ||
n.metrics.catalogPutFailureCount.Inc(ctx) | ||
logger.Errorf(ctx, "Failed to write results to catalog for Task [%v]. Error: %v", catalogKey.Identifier, err) | ||
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_PUT_FAILURE, status.GetMetadata()), nil | ||
} | ||
|
||
n.metrics.catalogPutSuccessCount.Inc(ctx) | ||
logger.Infof(ctx, "Successfully cached results to catalog - Task [%v]", catalogKey.Identifier) | ||
return status, nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there one potential problem here, consider this case
Round 1:
node 1 - cache-hit
node 2 - cache-hit
node 3 - system error
Round 2:
node 1 - cache hit
node 2 - cache miss
- event write failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe this will not happen, as when we actually get a system error I think we update the state?
is it possible to simulate this and test this?
One other option is allow admin to move the state from success to running!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the system error on node 3 are is this a Flyte retryable error where we attempt the task again, or a propeller error? In the former case, correct - we persist state and handle this. In the later case, we may get failures in updating ultimately abort the workflow. I think this is how it would work with the current
TaskHandler
caching though too right?