Skip to content

Commit

Permalink
Merge pull request cockroachdb#68123 from miretskiy/backport21.1-68087
Browse files Browse the repository at this point in the history
release-21.1: changefeedccl: Use separate budgets for sink and kvfeed.
  • Loading branch information
miretskiy authored Jul 27, 2021
2 parents eb059d2 + 3cff765 commit f4f32a0
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,19 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.kvFeedMemMon = kvFeedMemMon

var err error
// NB: sink uses pool bound account, and not kvFeedMemMon.
// This is because if we use shared kvFeedMemMon budget, it is possible that that budget
// will be exhausted by kvfeed (e.g. because of a down or slow sink); Then, when sink
// is no longer unavailable, we will proceed with the message, but once it gets to the sink,
// we won't be able to allocate additional memory because more events could have been added
// to KVFeed buffer. Basically, the problem is that the ingress rate of messages into kvfeed
// buffer is different from the eggress rate from the sink.
// TODO(yevgeniy): The real solution is to have the sink pushback.
// TODO(yevgeniy): getSink is getting to be quite a kitchen sink -- refactor.
ca.sink, err = getSink(
ctx, ca.spec.Feed.SinkURI, ca.flowCtx.EvalCtx.NodeID.SQLInstanceID(), ca.spec.Feed.Opts, ca.spec.Feed.Targets,
ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User(),
kvFeedMemMon.MakeBoundAccount(),
pool.MakeBoundAccount(),
)
if err != nil {
err = MarkRetryableError(err)
Expand Down

0 comments on commit f4f32a0

Please sign in to comment.