From 3cff765477e310128348b9c97492073b890af33a Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 26 Jul 2021 22:02:05 -0400 Subject: [PATCH] changefeedccl: Use separate budgets for sink and kvfeed. Use separate memory budget accounting for memory used in kvfeed and the memory used by the sink for in-transit messages. We need to use separate budgets because the rate of incoming messages might be very different from the rate of egress messages. As a result, it's possible that we can re-fill a kvfeed buffer faster than we can drain it; and as a result, the sink will not be able to request additional memory for its inflight messages. The follow on PRs will change the sink to support pushback mode, so the above would not be necessary. Release Notes: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index bc3db25506e3..e085d565c621 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -218,11 +218,19 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.kvFeedMemMon = kvFeedMemMon var err error + // NB: sink uses pool bound account, and not kvFeedMemMon. + // This is because if we use shared kvFeedMemMon budget, it is possible that that budget + // will be exhausted by kvfeed (e.g. because of a down or slow sink); Then, when sink + // is no longer unavailable, we will proceed with the message, but once it gets to the sink, + // we won't be able to allocate additional memory because more events could have been added + // to KVFeed buffer. Basically, the problem is that the ingress rate of messages into kvfeed + // buffer is different from the eggress rate from the sink. + // TODO(yevgeniy): The real solution is to have the sink pushback. // TODO(yevgeniy): getSink is getting to be quite a kitchen sink -- refactor. ca.sink, err = getSink( ctx, ca.spec.Feed.SinkURI, ca.flowCtx.EvalCtx.NodeID.SQLInstanceID(), ca.spec.Feed.Opts, ca.spec.Feed.Targets, ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User(), - kvFeedMemMon.MakeBoundAccount(), + pool.MakeBoundAccount(), ) if err != nil { err = MarkRetryableError(err)