Skip to content

Commit

Permalink
changefeedccl: Use separate budgets for sink and kvfeed.
Browse files Browse the repository at this point in the history
Use separate memory budget accounting for memory used in kvfeed
and the memory used by the sink for in-transit messages.

We need to use separate budgets because the rate of incoming messages
might be very different from the rate of egress messages.  As a result,
it's possible that we can re-fill a kvfeed buffer faster than we can
drain it; and as a result, the sink will not be able to request
additional memory for its inflight messages.

The follow on PRs will change the sink to support pushback mode, so
the above would not be necessary.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Jul 27, 2021
1 parent 42ce127 commit 1ad8ea0
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,17 @@ func (ca *changeAggregator) Start(ctx context.Context) {
kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{})
ca.kvFeedMemMon = kvFeedMemMon

// NB: sink uses pool bound account, and not kvFeedMemMon.
// This is because if we use shared kvFeedMemMon budget, it is possible that that budget
// will be exhausted by kvfeed (e.g. because of a down or slow sink); Then, when sink
// is no longer unavailable, we will proceed with the message, but once it gets to the sink,
// we won't be able to allocate additional memory because more events could have been added
// to KVFeed buffer. Basically, the problem is that the ingress rate of messages into kvfeed
// buffer is different from the eggress rate from the sink.
// TODO(yevgeniy): The real solution is to have the sink pushback.
ca.sink, err = getSink(
ctx, ca.flowCtx.Cfg, ca.spec.Feed, timestampOracle,
ca.spec.User(), kvFeedMemMon.MakeBoundAccount(), ca.spec.JobID)
ca.spec.User(), pool.MakeBoundAccount(), ca.spec.JobID)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand Down

0 comments on commit 1ad8ea0

Please sign in to comment.