Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Move caching to node executor for fast cache hits #485

Merged
merged 46 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
817ae4b
fast cache working-ish
hamersaw Sep 23, 2022
16d3fbc
processing downstream immediately on cache hit
hamersaw Sep 26, 2022
43f40e7
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw Oct 6, 2022
08eda31
moved cache write to node executor
hamersaw Oct 6, 2022
ac773ae
working cache and cache serialize
hamersaw Oct 8, 2022
c3464da
starting to clean up
hamersaw Oct 8, 2022
3f0b705
removed commented out code
hamersaw Oct 9, 2022
fcb9174
removed separate IsCacheable and IsCacheSerializable functions from C…
hamersaw Oct 9, 2022
46b525e
refactored reservation owner id to new function to remove duplication
hamersaw Oct 9, 2022
82a76c5
added cache metrics to the node executor
hamersaw Oct 9, 2022
1a33a4d
cleaned up node cache.go
hamersaw Oct 9, 2022
783858a
more cleanup
hamersaw Oct 9, 2022
43c6827
setting cache information in phase info so that it is available in ev…
hamersaw Oct 10, 2022
6310902
minor refactoring and bug fixes
hamersaw Oct 10, 2022
502d749
doing an outputs lookup on cache to ensure correctness during failures
hamersaw Oct 10, 2022
fe2d056
fix unit tests
hamersaw Oct 11, 2022
a3e1581
fixed lint issues
hamersaw Oct 11, 2022
8a49895
moved catalog package to the node level
hamersaw Oct 11, 2022
6471ce8
refactored task handler
hamersaw Oct 11, 2022
a0e4619
fixed catalog imports on unit testes
hamersaw Oct 11, 2022
439af82
started cache unit tests
hamersaw Oct 11, 2022
02e314c
added CheckCatalogCache unit tests
hamersaw Oct 11, 2022
4b8c241
unit tests for node cache file
hamersaw Oct 11, 2022
757d3b3
added node executor cache unit tests
hamersaw Oct 12, 2022
3dc1f14
fixed cache unit tets
hamersaw Oct 12, 2022
4093692
fixed lint issues
hamersaw Oct 12, 2022
270396a
transitioning to 'Succeeded' immediately on cache hit
hamersaw Oct 19, 2022
aa67293
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw Mar 29, 2023
0b65120
supporting cache overwrite
hamersaw Mar 30, 2023
cd72a0e
fixed lint issues
hamersaw Mar 30, 2023
dcaefdc
removed automatic downstream on cache hit
hamersaw Apr 3, 2023
ccd52fc
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw Apr 3, 2023
b940d00
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw Aug 1, 2023
5ea74d8
bumping boilerplate support tools to go 1.19 to fix generate
hamersaw Aug 3, 2023
cabfc58
fixed unit tests and linter
hamersaw Aug 3, 2023
95bf8e1
removed unnecessary async catalog client from nodeExecutor
hamersaw Aug 3, 2023
5d019bf
general refactoring
hamersaw Aug 4, 2023
e0f6715
fastcache working with arraynode
hamersaw Aug 7, 2023
f3ca83c
fixed unit tests - no longer checking for output existance on first e…
hamersaw Aug 7, 2023
15c46af
updating documentation TODOs
hamersaw Aug 7, 2023
fa721e6
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw Aug 7, 2023
9f8b7d9
updated arraynode fastcache to correctly report cache hits
hamersaw Aug 8, 2023
4347b6d
remove print statement
hamersaw Aug 9, 2023
9245f3e
fixed cache serialize
hamersaw Aug 14, 2023
c672b07
fixed unit tests
hamersaw Aug 14, 2023
5c7c3c7
Merge branch 'master' into feature/fast-cache-move-to-node
hamersaw Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions boilerplate/flyte/golang_support_tools/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/flyteorg/boilerplate

go 1.17
go 1.19

require (
github.com/EngHabu/mockery v0.0.0-20220405200825-3f76291311cf
Expand Down Expand Up @@ -163,16 +163,15 @@ require (
github.com/ultraware/whitespace v0.0.4 // indirect
github.com/uudashr/gocognit v1.0.1 // indirect
go.opencensus.io v0.22.6 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/oauth2 v0.0.0-20210126194326-f9ce19ea3013 // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
golang.org/x/tools v0.11.1 // indirect
google.golang.org/api v0.38.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210126160654-44e461bb6506 // indirect
Expand Down
52 changes: 13 additions & 39 deletions boilerplate/flyte/golang_support_tools/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/catalog"
errors3 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/factory"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog"
"github.com/flyteorg/flytepropeller/pkg/controller/workflow"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"
leader "github.com/flyteorg/flytepropeller/pkg/leaderelection"
Expand Down
25 changes: 18 additions & 7 deletions pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,18 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu

retryAttempt := subNodeStatus.GetAttempts()

// fastcache will not emit task events for cache hits. we need to manually detect a
// transition to `SUCCEEDED` and add an `ExternalResourceInfo` for it.
if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(arrayEventRecorder.TaskEvents()) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there one potential problem here, consider this case

Round 1:
node 1 - cache-hit
node 2 - cache-hit
node 3 - system error
Round 2:
node 1 - cache hit
node 2 - cache miss
- event write failure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe this will not happen, as when we actually get a system error I think we update the state?
is it possible to simulate this and test this?

One other option is allow admin to move the state from success to running!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the system error on node 3 are is this a Flyte retryable error where we attempt the task again, or a propeller error? In the former case, correct - we persist state and handle this. In the later case, we may get failures in updating ultimately abort the workflow. I think this is how it would work with the current TaskHandler caching though too right?

externalResources = append(externalResources, &event.ExternalResourceInfo{
ExternalId: buildSubNodeID(nCtx, i, retryAttempt),
Index: uint32(i),
RetryAttempt: retryAttempt,
Phase: idlcore.TaskExecution_SUCCEEDED,
CacheStatus: cacheStatus,
})
}

for _, taskExecutionEvent := range arrayEventRecorder.TaskEvents() {
for _, log := range taskExecutionEvent.Logs {
log.Name = fmt.Sprintf("%s-%d", log.Name, i)
Expand Down Expand Up @@ -476,6 +488,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1
}

fmt.Printf("HAMERSAW - sending event with taskPhase %+v and version %d\n", taskPhase, taskPhaseVersion)
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
taskExecutionEvent, err := buildTaskExecutionEvent(ctx, nCtx, taskPhase, taskPhaseVersion, externalResources)
if err != nil {
return handler.UnknownTransition, err
Expand Down Expand Up @@ -543,19 +556,17 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter

inputReader := newStaticInputReader(nCtx.InputReader(), inputLiteralMap)

// if node has not yet started we automatically set to NodePhaseQueued to skip input resolution
if nodePhase == v1alpha1.NodePhaseNotYetStarted {
// TODO - to supprt fastcache we'll need to override the bindings to BindingScalars for the input resolution on the nCtx
// that way resolution is just reading a literal ... but does this still write a file then?!?
nodePhase = v1alpha1.NodePhaseQueued
}

// wrap node lookup
subNodeSpec := *arrayNode.GetSubNodeSpec()

subNodeID := fmt.Sprintf("%s-n%d", nCtx.NodeID(), subNodeIndex)
subNodeSpec.ID = subNodeID
subNodeSpec.Name = subNodeID
// mock the input bindings for the subNode to nil to bypass input resolution in the
// `nodeExecutor.preExecute` function. this is required because this function is the entrypoint
// for initial cache lookups. an alternative solution would be to mock the datastore to bypass
// writing the inputFile.
subNodeSpec.InputBindings = nil

// TODO - if we want to support more plugin types we need to figure out the best way to store plugin state
// currently just mocking based on node phase -> which works for all k8s plugins
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/config"
execmocks "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/catalog"
gatemocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/gate/mocks"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces/mocks"
recoverymocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginmocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/nodes/array/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type arrayNodeLookup struct {
subNodeStatus *v1alpha1.NodeStatus
}

func (a *arrayNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
if id == a.subNodeID {
return nil, nil
}

return a.NodeLookup.ToNode(id)
}

func (a *arrayNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) {
if nodeID == a.subNodeID {
return a.subNodeSpec, true
Expand Down
234 changes: 234 additions & 0 deletions pkg/controller/nodes/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package nodes

import (
"context"
"strconv"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/encoding"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/common"
nodeserrors "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"

"github.com/pkg/errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// computeCatalogReservationOwnerID constructs a unique identifier which includes the nodes
// parent information, node ID, and retry attempt number. This is used to uniquely identify a task
// when the cache reservation API to serialize cached executions.
func computeCatalogReservationOwnerID(nCtx interfaces.NodeExecutionContext) (string, error) {
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID())
if err != nil {
return "", err
}

ownerID, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength,
[]string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(nCtx.CurrentAttempt()))})
if err != nil {
return "", err
}

return ownerID, nil
}

// updatePhaseCacheInfo adds the cache and catalog reservation metadata to the PhaseInfo. This
// ensures this information is reported in events and available within FlyteAdmin.
func updatePhaseCacheInfo(phaseInfo handler.PhaseInfo, cacheStatus *catalog.Status, reservationStatus *core.CatalogReservation_Status) handler.PhaseInfo {
if cacheStatus == nil && reservationStatus == nil {
return phaseInfo
}

info := phaseInfo.GetInfo()
if info == nil {
info = &handler.ExecutionInfo{}
}

if info.TaskNodeInfo == nil {
info.TaskNodeInfo = &handler.TaskNodeInfo{}
}

if info.TaskNodeInfo.TaskNodeMetadata == nil {
info.TaskNodeInfo.TaskNodeMetadata = &event.TaskNodeMetadata{}
}

if cacheStatus != nil {
info.TaskNodeInfo.TaskNodeMetadata.CacheStatus = cacheStatus.GetCacheStatus()
info.TaskNodeInfo.TaskNodeMetadata.CatalogKey = cacheStatus.GetMetadata()
}

if reservationStatus != nil {
info.TaskNodeInfo.TaskNodeMetadata.ReservationStatus = *reservationStatus
}

return phaseInfo.WithInfo(info)
}

// CheckCatalogCache uses the handler and contexts to check if cached outputs for the current node
// exist. If the exist, this function also copies the outputs to this node.
func (n *nodeExecutor) CheckCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Entry, error) {
catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx)
if err != nil {
return catalog.Entry{}, errors.Wrapf(err, "failed to initialize the catalogKey")
}

entry, err := n.catalog.Get(ctx, catalogKey)
if err != nil {
causeErr := errors.Cause(err)
if taskStatus, ok := status.FromError(causeErr); ok && taskStatus.Code() == codes.NotFound {
n.metrics.catalogMissCount.Inc(ctx)
logger.Infof(ctx, "Catalog CacheMiss: Artifact not found in Catalog. Executing Task.")
return catalog.NewCatalogEntry(nil, catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, nil)), nil
}

n.metrics.catalogGetFailureCount.Inc(ctx)
logger.Errorf(ctx, "Catalog Failure: memoization check failed. err: %v", err.Error())
return catalog.Entry{}, errors.Wrapf(err, "Failed to check Catalog for previous results")
}

if entry.GetStatus().GetCacheStatus() != core.CatalogCacheStatus_CACHE_HIT {
logger.Errorf(ctx, "No CacheHIT and no Error received. Illegal state, Cache State: %s", entry.GetStatus().GetCacheStatus().String())
// TODO should this be an error?
return entry, nil
}

logger.Infof(ctx, "Catalog CacheHit: for task [%s/%s/%s/%s]", catalogKey.Identifier.Project,
catalogKey.Identifier.Domain, catalogKey.Identifier.Name, catalogKey.Identifier.Version)
n.metrics.catalogHitCount.Inc(ctx)

iface := catalogKey.TypedInterface
if iface.Outputs != nil && len(iface.Outputs.Variables) > 0 {
// copy cached outputs to node outputs
o, ee, err := entry.GetOutputs().Read(ctx)
if err != nil {
logger.Errorf(ctx, "failed to read from catalog, err: %s", err.Error())
return catalog.Entry{}, err
} else if ee != nil {
logger.Errorf(ctx, "got execution error from catalog output reader? This should not happen, err: %s", ee.String())
return catalog.Entry{}, nodeserrors.Errorf(nodeserrors.IllegalStateError, nCtx.NodeID(), "execution error from a cache output, bad state: %s", ee.String())
}

outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
if err := nCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, o); err != nil {
logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error())
return catalog.Entry{}, err
}
}

return entry, nil
}

// GetOrExtendCatalogReservation attempts to acquire an artifact reservation if the task is
// cachable and cache serializable. If the reservation already exists for this owner, the
// reservation is extended.
func (n *nodeExecutor) GetOrExtendCatalogReservation(ctx context.Context, nCtx interfaces.NodeExecutionContext,
cacheHandler interfaces.CacheableNodeHandler, heartbeatInterval time.Duration) (catalog.ReservationEntry, error) {

catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx)
if err != nil {
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED),
errors.Wrapf(err, "failed to initialize the catalogKey")
}

ownerID, err := computeCatalogReservationOwnerID(nCtx)
if err != nil {
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED),
errors.Wrapf(err, "failed to initialize the cache reservation ownerID")
}

reservation, err := n.catalog.GetOrExtendReservation(ctx, catalogKey, ownerID, heartbeatInterval)
if err != nil {
n.metrics.reservationGetFailureCount.Inc(ctx)
logger.Errorf(ctx, "Catalog Failure: reservation get or extend failed. err: %v", err.Error())
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_FAILURE), err
}

var status core.CatalogReservation_Status
if reservation.OwnerId == ownerID {
status = core.CatalogReservation_RESERVATION_ACQUIRED
} else {
status = core.CatalogReservation_RESERVATION_EXISTS
}

n.metrics.reservationGetSuccessCount.Inc(ctx)
return catalog.NewReservationEntry(reservation.ExpiresAt.AsTime(),
reservation.HeartbeatInterval.AsDuration(), reservation.OwnerId, status), nil
}

// ReleaseCatalogReservation attempts to release an artifact reservation if the task is cachable
// and cache serializable. If the reservation does not exist for this owner (e.x. it never existed
// or has been acquired by another owner) this call is still successful.
func (n *nodeExecutor) ReleaseCatalogReservation(ctx context.Context, nCtx interfaces.NodeExecutionContext,
cacheHandler interfaces.CacheableNodeHandler) (catalog.ReservationEntry, error) {

catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx)
if err != nil {
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED),
errors.Wrapf(err, "failed to initialize the catalogKey")
}

ownerID, err := computeCatalogReservationOwnerID(nCtx)
if err != nil {
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED),
errors.Wrapf(err, "failed to initialize the cache reservation ownerID")
}

err = n.catalog.ReleaseReservation(ctx, catalogKey, ownerID)
if err != nil {
n.metrics.reservationReleaseFailureCount.Inc(ctx)
logger.Errorf(ctx, "Catalog Failure: release reservation failed. err: %v", err.Error())
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_FAILURE), err
}

n.metrics.reservationReleaseSuccessCount.Inc(ctx)
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_RELEASED), nil
}

// WriteCatalogCache relays the outputs of this node to the cache. This allows future executions
// to reuse these data to avoid recomputation.
func (n *nodeExecutor) WriteCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Status, error) {
catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx)
if err != nil {
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), errors.Wrapf(err, "failed to initialize the catalogKey")
}

iface := catalogKey.TypedInterface
if iface.Outputs != nil && len(iface.Outputs.Variables) == 0 {
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), nil
}

logger.Infof(ctx, "Catalog CacheEnabled. recording execution [%s/%s/%s/%s]", catalogKey.Identifier.Project,
catalogKey.Identifier.Domain, catalogKey.Identifier.Name, catalogKey.Identifier.Version)

outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
metadata := catalog.Metadata{
TaskExecutionIdentifier: task.GetTaskExecutionIdentifier(nCtx),
}

// ignores discovery write failures
status, err := n.catalog.Put(ctx, catalogKey, outputReader, metadata)
if err != nil {
n.metrics.catalogPutFailureCount.Inc(ctx)
logger.Errorf(ctx, "Failed to write results to catalog for Task [%v]. Error: %v", catalogKey.Identifier, err)
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_PUT_FAILURE, status.GetMetadata()), nil
}

n.metrics.catalogPutSuccessCount.Inc(ctx)
logger.Infof(ctx, "Successfully cached results to catalog - Task [%v]", catalogKey.Identifier)
return status, nil
}
Loading