diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index bc3db25506e3..e085d565c621 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -218,11 +218,19 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.kvFeedMemMon = kvFeedMemMon var err error + // NB: sink uses pool bound account, and not kvFeedMemMon. + // This is because if we use shared kvFeedMemMon budget, it is possible that that budget + // will be exhausted by kvfeed (e.g. because of a down or slow sink); Then, when sink + // is no longer unavailable, we will proceed with the message, but once it gets to the sink, + // we won't be able to allocate additional memory because more events could have been added + // to KVFeed buffer. Basically, the problem is that the ingress rate of messages into kvfeed + // buffer is different from the eggress rate from the sink. + // TODO(yevgeniy): The real solution is to have the sink pushback. // TODO(yevgeniy): getSink is getting to be quite a kitchen sink -- refactor. ca.sink, err = getSink( ctx, ca.spec.Feed.SinkURI, ca.flowCtx.EvalCtx.NodeID.SQLInstanceID(), ca.spec.Feed.Opts, ca.spec.Feed.Targets, ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User(), - kvFeedMemMon.MakeBoundAccount(), + pool.MakeBoundAccount(), ) if err != nil { err = MarkRetryableError(err)