Skip to content

Commit

Permalink
changefeedccl: restart when memory limit changes
Browse files Browse the repository at this point in the history
Previously, changing `changefeed.memory.per_changefeed_limit` would require
restarting a changefeed for the setting to take effect.

This change makes it so that when the changefeed coordinator detects a change
in memory limits, it restarts all the aggregators using a retryable error.

Release note: None
Informs: cockroachdb#96953
Epic: None
  • Loading branch information
jayshrivastava committed Aug 21, 2023
1 parent 9ad8453 commit fbf6cf3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
22 changes: 22 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
Expand Down Expand Up @@ -334,6 +335,10 @@ func (ca *changeAggregator) Start(ctx context.Context) {

// Generate expensive checkpoint only after we ran for a while.
ca.lastSpanFlush = timeutil.Now()

if ca.knobs.AggregatorStarted != nil {
ca.knobs.AggregatorStarted(limit)
}
}

func (ca *changeAggregator) startKVFeed(
Expand Down Expand Up @@ -1192,8 +1197,19 @@ func (cf *changeFrontier) closeMetrics() {
cf.metrics.mu.Unlock()
}

// makeMemoryLimitWatcher returns an atomic bool which is set to true
// if the memory limit setting is changed.
func (cf *changeFrontier) makeMemoryLimitWatcher() *atomic.Bool {
var memoryLimitChanged atomic.Bool
changefeedbase.PerChangefeedMemLimit.SetOnChange(&cf.flowCtx.Cfg.Settings.SV, func(ctx context.Context) {
memoryLimitChanged.Store(true)
})
return &memoryLimitChanged
}

// Next is part of the RowSource interface.
func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
memLimitWatcher := cf.makeMemoryLimitWatcher()
for cf.State == execinfra.StateRunning {
if !cf.passthroughBuf.IsEmpty() {
return cf.ProcessRowHelper(cf.passthroughBuf.Pop()), nil
Expand Down Expand Up @@ -1260,6 +1276,12 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
cf.MoveToDraining(err)
break
}

// We need to restart all aggregators if the memory limits change.
if memLimitWatcher.Load() {
cf.MoveToDraining(changefeedbase.MarkRetryableError(errors.New("memory limit changed")))
break
}
}
return nil, cf.DrainHelper()
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8497,3 +8497,47 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) {
}
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

// TestChangefeedRetryWithMemoryLimits asserts that an aggregator will restart
// with the new memory limit when the setting
// `changefeed.memory.per_changefeed_limit` is changed.
func TestChangefeedRetryWithMemoryLimits(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)

var memLimit atomic.Int64
knobs.AggregatorStarted = func(m int64) {
memLimit.Store(m)
}

sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.memory.per_changefeed_limit = '10MB'`)

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, foo)

sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 0)`)
assertPayloads(t, foo, []string{
`foo: [0]->{"after": {"a": 0, "b": 0}}`,
})

sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.memory.per_changefeed_limit = '1MB'`)

testutils.SucceedsSoon(t, func() error {
limit := memLimit.Load()
if limit != 1000000 {
return errors.AssertionFailedf("expected memory limit to be %d, found %d", 1<<20, limit)
}
return nil
})
}

cdcTest(t, testFn)
}
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type TestingKnobs struct {

// OnDrain returns the channel to select on to detect node drain
OnDrain func() <-chan struct{}

// AggregatorStarted is called when `(*changeAggregator) Start` completes.
AggregatorStarted func(memLimit int64)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit fbf6cf3

Please sign in to comment.