From fbf6cf3010adc0feab05aa2da036311f331a69c8 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 21 Aug 2023 14:13:52 -0400 Subject: [PATCH] changefeedccl: restart when memory limit changes Previously, changing `changefeed.memory.per_changefeed_limit` would require restarting a changefeed for the setting to take effect. This change makes it so that when the changefeed coordinator detects a change in memory limits, it restarts all the aggregators using a retryable error. Release note: None Informs: https://github.com/cockroachdb/cockroach/issues/96953 Epic: None --- .../changefeedccl/changefeed_processors.go | 22 ++++++++++ pkg/ccl/changefeedccl/changefeed_test.go | 44 +++++++++++++++++++ pkg/ccl/changefeedccl/testing_knobs.go | 3 ++ 3 files changed, 69 insertions(+) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 102a7f855a94..4c56fab9d1f9 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "fmt" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" @@ -334,6 +335,10 @@ func (ca *changeAggregator) Start(ctx context.Context) { // Generate expensive checkpoint only after we ran for a while. ca.lastSpanFlush = timeutil.Now() + + if ca.knobs.AggregatorStarted != nil { + ca.knobs.AggregatorStarted(limit) + } } func (ca *changeAggregator) startKVFeed( @@ -1192,8 +1197,19 @@ func (cf *changeFrontier) closeMetrics() { cf.metrics.mu.Unlock() } +// makeMemoryLimitWatcher returns an atomic bool which is set to true +// if the memory limit setting is changed. +func (cf *changeFrontier) makeMemoryLimitWatcher() *atomic.Bool { + var memoryLimitChanged atomic.Bool + changefeedbase.PerChangefeedMemLimit.SetOnChange(&cf.flowCtx.Cfg.Settings.SV, func(ctx context.Context) { + memoryLimitChanged.Store(true) + }) + return &memoryLimitChanged +} + // Next is part of the RowSource interface. func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + memLimitWatcher := cf.makeMemoryLimitWatcher() for cf.State == execinfra.StateRunning { if !cf.passthroughBuf.IsEmpty() { return cf.ProcessRowHelper(cf.passthroughBuf.Pop()), nil @@ -1260,6 +1276,12 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad cf.MoveToDraining(err) break } + + // We need to restart all aggregators if the memory limits change. + if memLimitWatcher.Load() { + cf.MoveToDraining(changefeedbase.MarkRetryableError(errors.New("memory limit changed"))) + break + } } return nil, cf.DrainHelper() } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 08cdbc86a00e..5c0fd8e31188 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -8497,3 +8497,47 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) { } cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestChangefeedRetryWithMemoryLimits asserts that an aggregator will restart +// with the new memory limit when the setting +// `changefeed.memory.per_changefeed_limit` is changed. +func TestChangefeedRetryWithMemoryLimits(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + + var memLimit atomic.Int64 + knobs.AggregatorStarted = func(m int64) { + memLimit.Store(m) + } + + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.memory.per_changefeed_limit = '10MB'`) + + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) + defer closeFeed(t, foo) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 0)`) + assertPayloads(t, foo, []string{ + `foo: [0]->{"after": {"a": 0, "b": 0}}`, + }) + + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.memory.per_changefeed_limit = '1MB'`) + + testutils.SucceedsSoon(t, func() error { + limit := memLimit.Load() + if limit != 1000000 { + return errors.AssertionFailedf("expected memory limit to be %d, found %d", 1<<20, limit) + } + return nil + }) + } + + cdcTest(t, testFn) +} diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 79fb3363a5b9..bda315896015 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -77,6 +77,9 @@ type TestingKnobs struct { // OnDrain returns the channel to select on to detect node drain OnDrain func() <-chan struct{} + + // AggregatorStarted is called when `(*changeAggregator) Start` completes. + AggregatorStarted func(memLimit int64) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.