From c7a195cb741b1c910c97e4dec51df3a1097917ff Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 12 Jun 2019 14:41:49 -0700 Subject: [PATCH] changefeedccl: remove deprecated poller We switched the default to push-based rangefeeds in 19.1. This removes the old pull-based poller fallback entirely. Details of the removal: - The relevant code is removed - Several poller-related hacks are removed - The changefeed.run.push.enabled telemetry metric is removed - The changefeed.push.enabled cluster setting is removed - The poller subtest is removed from each changefeedccl test - The cdc/poller roachtest is skipped on 19.2+ - TestValidations is removed, it's redundant with the much better quality TestChangefeedNemeses Note that the table history still does some polling, but switching this to RangeFeed will cause an unacceptable increase in the commit-to-emit latency of rows. This bit of polling will be removed as part of #36289. This commit also leaves the structure of the changefeed code mostly unchanged. There is an opportunity for cleanup here, but this also will wait for after #36289. Closes #36914 Release note: None --- docs/generated/settings/settings.html | 1 - pkg/ccl/changefeedccl/bench_test.go | 11 +- pkg/ccl/changefeedccl/cdctest/nemeses.go | 42 +----- pkg/ccl/changefeedccl/changefeed.go | 12 -- pkg/ccl/changefeedccl/changefeed_dist.go | 9 +- .../changefeedccl/changefeed_processors.go | 7 +- pkg/ccl/changefeedccl/changefeed_test.go | 63 ++------- pkg/ccl/changefeedccl/encoder_test.go | 3 - pkg/ccl/changefeedccl/helpers_test.go | 21 +-- pkg/ccl/changefeedccl/nemeses_test.go | 1 - pkg/ccl/changefeedccl/poller.go | 124 +++--------------- pkg/ccl/changefeedccl/validations_test.go | 91 ------------- pkg/cmd/roachtest/cdc.go | 6 + pkg/settings/registry.go | 3 +- 14 files changed, 57 insertions(+), 337 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 02cb833d065c..730712dd8d35 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -2,7 +2,6 @@ SettingTypeDefaultDescription changefeed.experimental_poll_intervalduration1spolling interval for the prototype changefeed implementation (WARNING: may compromise cluster stability or correctness; do not edit without supervision) -changefeed.push.enabledbooleantrueif set, changed are pushed instead of pulled. This requires the kv.rangefeed.enabled setting. See https://www.cockroachlabs.com/docs/v19.2/change-data-capture.html#enable-rangefeeds-to-reduce-latency cloudstorage.gs.default.keystringif set, JSON key to use during Google Cloud Storage operations cloudstorage.http.custom_castringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeoutduration10m0sthe timeout for import/export storage operations diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 697ba66d54ee..c4021651df86 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -42,6 +42,15 @@ func BenchmarkChangefeedTicks(b *testing.B) { defer leaktest.AfterTest(b)() defer log.Scope(b).Close(b) + // In PR #38211, we removed the polling based data watcher in changefeeds in + // favor of RangeFeed. This benchmark worked by writing a bunch of data at + // certain timestamps and manipulating clocks at runtime so the polling + // grabbed a little of it at a time. There's fundamentally no way for this to + // work with RangeFeed without a rewrite, but it's not being used for anything + // right now, so the rewrite isn't worth it. We should fix this if we need to + // start doing changefeed perf work at some point. + b.Skip(`broken in #38211`) + ctx := context.Background() s, sqlDBRaw, _ := serverutils.StartServer(b, base.TestServerArgs{UseDatabase: "d"}) defer s.Stopper().Stop(ctx) @@ -212,7 +221,7 @@ func createBenchmarkChangefeed( s.ClusterSettings(), details, spans, encoder, sink, rowsFn, TestingKnobs{}, metrics) ctx, cancel := context.WithCancel(ctx) - go func() { _ = poller.Run(ctx) }() + go func() { _ = poller.RunUsingRangefeeds(ctx) }() go func() { _ = thUpdater.PollTableDescs(ctx) }() errCh := make(chan error, 1) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 511e6f0fe9c2..1c6fc9370545 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -13,7 +13,6 @@ import ( gosql "database/sql" "math/rand" "strings" - "time" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -43,17 +42,9 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB) (Validator, error) { ctx := context.Background() rng, _ := randutil.NewPseudoRand() - var usingRangeFeed bool - if err := db.QueryRow( - `SHOW CLUSTER SETTING changefeed.push.enabled`, - ).Scan(&usingRangeFeed); err != nil { - return nil, err - } - ns := &nemeses{ - rowCount: 4, - db: db, - usingPoller: !usingRangeFeed, + rowCount: 4, + db: db, // eventMix does not have to add to 100 eventMix: map[fsm.Event]int{ // eventTransact opens an UPSERT transaction is there is not one open. If @@ -172,10 +163,9 @@ const ( ) type nemeses struct { - rowCount int - eventMix map[fsm.Event]int - mixTotal int - usingPoller bool + rowCount int + eventMix map[fsm.Event]int + mixTotal int v *CountValidator db *gosql.DB @@ -377,28 +367,6 @@ func transact(a fsm.Args) error { func noteFeedMessage(a fsm.Args) error { ns := a.Extended.(*nemeses) - // The poller works by continually selecting a timestamp to be the next - // high-water and polling for changes between the last high-water and the new - // one. It doesn't push any unresolved intents (it would enter the txnwaitq, - // which would see the txn as live and hence not try to push it), so if we - // have an open transaction, it's possible that the poller is stuck waiting on - // it to resolve, which would cause the below call to `Next` to deadlock. This - // breaks that deadlock. - if ns.usingPoller { - nextDone := make(chan struct{}) - defer close(nextDone) - go func() { - select { - case <-time.After(5 * time.Second): - log.Info(a.Ctx, "pushed open txn to break deadlock") - if err := push(a); err != nil { - panic(err) - } - case <-nextDone: - } - }() - } - m, err := ns.f.Next() if err != nil { return err diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 74e2599eded4..93b667814ffe 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -12,7 +12,6 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -37,17 +36,6 @@ var changefeedPollInterval = func() *settings.DurationSetting { return s }() -// PushEnabled is a cluster setting that triggers all subsequently -// created/unpaused changefeeds to receive kv changes via RangeFeed push -// (instead of ExportRequest polling). -var PushEnabled = settings.RegisterBoolSetting( - "changefeed.push.enabled", - "if set, changed are pushed instead of pulled. This requires the "+ - "kv.rangefeed.enabled setting. See "+ - base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`), - true, -) - const ( jsonMetaSentinel = `__crdb__` ) diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 5c2ac0f15ccf..74cb196b106f 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/distsqlplan" @@ -72,13 +71,6 @@ func distChangefeedFlow( return err } - execCfg := phs.ExecCfg() - if PushEnabled.Get(&execCfg.Settings.SV) { - telemetry.Count(`changefeed.run.push.enabled`) - } else { - telemetry.Count(`changefeed.run.push.disabled`) - } - spansTS := details.StatementTime var initialHighWater hlc.Timestamp if h := progress.GetHighWater(); h != nil && *h != (hlc.Timestamp{}) { @@ -88,6 +80,7 @@ func distChangefeedFlow( spansTS = initialHighWater } + execCfg := phs.ExecCfg() trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, spansTS) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 6bd89072f7a5..4913042bfc74 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -190,12 +190,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { ca.pollerDoneCh = make(chan struct{}) if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) { defer close(ca.pollerDoneCh) - var err error - if PushEnabled.Get(&ca.flowCtx.Settings.SV) { - err = ca.poller.RunUsingRangefeeds(ctx) - } else { - err = ca.poller.Run(ctx) - } + err := ca.poller.RunUsingRangefeeds(ctx) // Trying to call MoveToDraining here is racy (`MoveToDraining called in // state stateTrailingMeta`), so return the error via a channel. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9652cd0f4253..639439a4feb3 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -82,7 +82,6 @@ func TestChangefeedBasics(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) // NB running TestChangefeedBasics, which includes a DELETE, with @@ -126,7 +125,6 @@ func TestChangefeedEnvelope(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedMultiTable(t *testing.T) { @@ -150,7 +148,6 @@ func TestChangefeedMultiTable(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedCursor(t *testing.T) { @@ -209,7 +206,6 @@ func TestChangefeedCursor(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedTimestamps(t *testing.T) { @@ -270,9 +266,6 @@ func TestChangefeedTimestamps(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - // Most poller tests use sinklessTest, but this one uses enterpriseTest - // because we get an extra assertion out of it. - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } func TestChangefeedResolvedFrequency(t *testing.T) { @@ -303,7 +296,6 @@ func TestChangefeedResolvedFrequency(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // Test how Changefeeds react to schema changes that do not require a backfill @@ -470,7 +462,6 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedSchemaChangeNoAllowBackfill(t *testing.T) { @@ -540,7 +531,6 @@ func TestChangefeedSchemaChangeNoAllowBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // Test schema changes that require a backfill when the backfill option is @@ -673,7 +663,6 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // Regression test for #34314 @@ -696,7 +685,6 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedInterleaved(t *testing.T) { @@ -747,7 +735,6 @@ func TestChangefeedInterleaved(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedColumnFamily(t *testing.T) { @@ -781,7 +768,6 @@ func TestChangefeedColumnFamily(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedComputedColumn(t *testing.T) { @@ -810,7 +796,6 @@ func TestChangefeedComputedColumn(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedUpdatePrimaryKey(t *testing.T) { @@ -843,7 +828,6 @@ func TestChangefeedUpdatePrimaryKey(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedTruncateRenameDrop(t *testing.T) { @@ -896,7 +880,6 @@ func TestChangefeedTruncateRenameDrop(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedMonitoring(t *testing.T) { @@ -913,9 +896,6 @@ func TestChangefeedMonitoring(t *testing.T) { } sqlDB := sqlutils.MakeSQLRunner(db) - var usingRangeFeed bool - sqlDB.QueryRow(t, `SHOW CLUSTER SETTING changefeed.push.enabled`, &usingRangeFeed) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) @@ -967,14 +947,11 @@ func TestChangefeedMonitoring(t *testing.T) { if c := s.MustGetSQLCounter(`changefeed.max_behind_nanos`); c <= 0 { return errors.Errorf(`expected > 0 got %d`, c) } - if usingRangeFeed { - // Only RangeFeed-based changefeeds use this buffer. - if c := s.MustGetSQLCounter(`changefeed.buffer_entries.in`); c <= 0 { - return errors.Errorf(`expected > 0 got %d`, c) - } - if c := s.MustGetSQLCounter(`changefeed.buffer_entries.out`); c <= 0 { - return errors.Errorf(`expected > 0 got %d`, c) - } + if c := s.MustGetSQLCounter(`changefeed.buffer_entries.in`); c <= 0 { + return errors.Errorf(`expected > 0 got %d`, c) + } + if c := s.MustGetSQLCounter(`changefeed.buffer_entries.out`); c <= 0 { + return errors.Errorf(`expected > 0 got %d`, c) } return nil }) @@ -983,7 +960,8 @@ func TestChangefeedMonitoring(t *testing.T) { sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) const expectedLatency = 100 * time.Millisecond sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = $1`, - (expectedLatency / 10).String()) + (expectedLatency / 3).String()) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = 1.0`) testutils.SucceedsSoon(t, func() error { waitForBehindNanos := 2 * expectedLatency.Nanoseconds() @@ -995,6 +973,10 @@ func TestChangefeedMonitoring(t *testing.T) { }) // Unblocking the emit should bring the max_behind_nanos back down. + // Unfortunately, this is sensitive to how many closed timestamp updates are + // received. If we get them too fast, it takes longer to process them then + // they come in and we fall continually further behind. The target_duration + // and close_fraction settings above are tuned to try to avoid this. close(beforeEmitRowCh) _, _ = foo.Next() testutils.SucceedsSoon(t, func() error { @@ -1037,7 +1019,6 @@ func TestChangefeedMonitoring(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedRetryableError(t *testing.T) { @@ -1108,7 +1089,6 @@ func TestChangefeedRetryableError(t *testing.T) { // Only the enterprise version uses jobs. t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } // TestChangefeedDataTTL ensures that changefeeds fail with an error in the case @@ -1194,7 +1174,6 @@ func TestChangefeedDataTTL(t *testing.T) { t.Run("sinkless", sinklessTest(testFn)) t.Run("enterprise", enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case @@ -1271,7 +1250,6 @@ func TestChangefeedSchemaTTL(t *testing.T) { t.Run("sinkless", sinklessTest(testFn)) t.Run("enterprise", enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedErrors(t *testing.T) { @@ -1500,7 +1478,6 @@ func TestChangefeedPermissions(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedDescription(t *testing.T) { @@ -1541,7 +1518,6 @@ func TestChangefeedDescription(t *testing.T) { // Only the enterprise version uses jobs. t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } func TestChangefeedPauseUnpause(t *testing.T) { defer leaktest.AfterTest(t)() @@ -1584,7 +1560,6 @@ func TestChangefeedPauseUnpause(t *testing.T) { // Only the enterprise version uses jobs. t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } func TestManyChangefeedsOneTable(t *testing.T) { @@ -1637,7 +1612,6 @@ func TestManyChangefeedsOneTable(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestUnspecifiedPrimaryKey(t *testing.T) { @@ -1663,7 +1637,6 @@ func TestUnspecifiedPrimaryKey(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // TestChangefeedNodeShutdown ensures that an enterprise changefeed continues @@ -1771,24 +1744,15 @@ func TestChangefeedTelemetry(t *testing.T) { expectedSink = `experimental-sql` } - var expectedPushEnabled string - if strings.Contains(t.Name(), `sinkless`) { - expectedPushEnabled = `enabled` - } else { - expectedPushEnabled = `disabled` - } - counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) require.Equal(t, int32(2), counts[`changefeed.create.sink.`+expectedSink]) require.Equal(t, int32(2), counts[`changefeed.create.format.json`]) require.Equal(t, int32(1), counts[`changefeed.create.num_tables.1`]) require.Equal(t, int32(1), counts[`changefeed.create.num_tables.2`]) - require.Equal(t, int32(2), counts[`changefeed.run.push.`+expectedPushEnabled]) } t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedMemBufferCapacity(t *testing.T) { @@ -1818,11 +1782,6 @@ func TestChangefeedMemBufferCapacity(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'small')`) - // Force rangefeed, even for the enterprise test. - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.push.enabled = true`) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) defer closeFeed(t, foo) diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index ae5c0fa08e11..454d2ffbfc95 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -320,7 +320,6 @@ func TestAvroEncoder(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestAvroMigrateToUnsupportedColumn(t *testing.T) { @@ -351,7 +350,6 @@ func TestAvroMigrateToUnsupportedColumn(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestAvroLedger(t *testing.T) { @@ -389,5 +387,4 @@ func TestAvroLedger(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index f9c0ee8f8352..9c14d281f4c0 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -254,10 +254,8 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) }) defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) - // TODO(dan): Switch this to RangeFeed, too. It seems wasteful right now - // because the RangeFeed version of the tests take longer due to - // closed_timestamp.target_duration's interaction with schema changes. - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.push.enabled = false`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) sqlDB.Exec(t, `CREATE DATABASE d`) sink, cleanup := sqlutils.PGUrl(t, s.ServingAddr(), t.Name(), url.User(security.RootUser)) @@ -268,20 +266,6 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) } } -func pollerTest( - metaTestFn func(func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T), - testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), -) func(*testing.T) { - return func(t *testing.T) { - metaTestFn(func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.push.enabled = false`) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) - testFn(t, db, f) - })(t) - } -} - func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { @@ -312,6 +296,7 @@ func cloudStorageTest( sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) sqlDB.Exec(t, `CREATE DATABASE d`) f := cdctest.MakeCloudFeedFactory(s, db, dir, flushCh) diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 95269457f6d7..25b3238ba658 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -34,6 +34,5 @@ func TestChangefeedNemeses(t *testing.T) { } t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) } diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 22ae0d91ec51..461b496e0785 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -113,83 +113,6 @@ func makePoller( return p } -// Run repeatedly polls and inserts changed kvs and resolved timestamps into a -// buffer. It blocks forever and is intended to be run in a goroutine. -// -// During each poll, a new high-water mark is chosen. The relevant spans for the -// configured tables are broken up by (possibly stale) range boundaries and -// every changed KV between the old and new high-water is fetched via -// ExportRequests. It backpressures sending the requests such that some maximum -// number are inflight or being inserted into the buffer. Finally, after each -// poll completes, a resolved timestamp notification is added to the buffer. -func (p *poller) Run(ctx context.Context) error { - for { - // Wait for polling interval - p.mu.Lock() - lastHighwater := p.mu.highWater - p.mu.Unlock() - - pollDuration := changefeedPollInterval.Get(&p.settings.SV) - pollDuration = pollDuration - timeutil.Since(timeutil.Unix(0, lastHighwater.WallTime)) - if pollDuration > 0 { - log.VEventf(ctx, 1, `sleeping for %s`, pollDuration) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(pollDuration): - } - } - - nextHighWater := p.clock.Now() - - tableMetadataStart := timeutil.Now() - // Ingest table descriptors up to the next prospective highwater. - if err := p.updateTableHistory(ctx, nextHighWater); err != nil { - return err - } - p.metrics.TableMetadataNanos.Inc(timeutil.Since(tableMetadataStart).Nanoseconds()) - - // Determine if we are at a scanBoundary, and trigger a full scan if needed. - isFullScan := false - p.mu.Lock() - if len(p.mu.scanBoundaries) > 0 { - if p.mu.scanBoundaries[0].Equal(lastHighwater) { - // Perform a full scan of the latest value of all keys as of the - // boundary timestamp and consume the boundary. - isFullScan = true - nextHighWater = lastHighwater - p.mu.scanBoundaries = p.mu.scanBoundaries[1:] - } else if p.mu.scanBoundaries[0].Less(nextHighWater) { - // If we aren't currently at a scan boundary, but the next highwater - // would bring us past the scan boundary, set nextHighWater to the - // scan boundary. This will cause us to capture all changes up to the - // scan boundary, then consume the boundary on the next iteration. - nextHighWater = p.mu.scanBoundaries[0] - } - } - p.mu.Unlock() - - if !isFullScan { - log.VEventf(ctx, 1, `changefeed poll (%s,%s]: %s`, - lastHighwater, nextHighWater, time.Duration(nextHighWater.WallTime-lastHighwater.WallTime)) - } else { - log.VEventf(ctx, 1, `changefeed poll full scan @ %s`, nextHighWater) - } - - spans, err := getSpansToProcess(ctx, p.db, p.spans) - if err != nil { - return err - } - - if err := p.exportSpansParallel(ctx, spans, lastHighwater, nextHighWater, isFullScan); err != nil { - return err - } - p.mu.Lock() - p.mu.highWater = nextHighWater - p.mu.Unlock() - } -} - // RunUsingRangeFeeds performs the same task as the normal Run method, but uses // the experimental Rangefeed system to capture changes rather than the // poll-and-export method. Note @@ -230,9 +153,9 @@ func (p *poller) rangefeedImpl(ctx context.Context) error { } p.mu.Unlock() if scanTime != (hlc.Timestamp{}) { - if err := p.exportSpansParallel( - ctx, spans, scanTime, scanTime, true, /* fullScan */ - ); err != nil { + // TODO(dan): Now that we no longer have the poller, we should stop using + // ExportRequest and start using normal Scans. + if err := p.exportSpansParallel(ctx, spans, scanTime); err != nil { return err } } @@ -423,7 +346,7 @@ func getSpansToProcess( } func (p *poller) exportSpansParallel( - ctx context.Context, spans []roachpb.Span, start, end hlc.Timestamp, isFullScan bool, + ctx context.Context, spans []roachpb.Span, ts hlc.Timestamp, ) error { // Export requests for the various watched spans are executed in parallel, // with a semaphore-enforced limit based on a cluster setting. @@ -448,7 +371,7 @@ func (p *poller) exportSpansParallel( g.GoCtx(func(ctx context.Context) error { defer func() { <-exportsSem }() - err := p.exportSpan(ctx, span, start, end, isFullScan) + err := p.exportSpan(ctx, span, ts) finished := atomic.AddInt64(&atomicFinished, 1) if log.V(2) { log.Infof(ctx, `exported %d of %d`, finished, len(spans)) @@ -462,39 +385,31 @@ func (p *poller) exportSpansParallel( return g.Wait() } -func (p *poller) exportSpan( - ctx context.Context, span roachpb.Span, start, end hlc.Timestamp, isFullScan bool, -) error { +func (p *poller) exportSpan(ctx context.Context, span roachpb.Span, ts hlc.Timestamp) error { sender := p.db.NonTransactionalSender() if log.V(2) { - log.Infof(ctx, `sending ExportRequest %s over (%s,%s]`, span, start, end) + log.Infof(ctx, `sending ExportRequest %s at %s`, span, ts) } - header := roachpb.Header{Timestamp: end} + header := roachpb.Header{Timestamp: ts} req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span), - StartTime: start, - MVCCFilter: roachpb.MVCCFilter_All, - ReturnSST: true, - OmitChecksum: true, - EnableTimeBoundIteratorOptimization: true, - } - if isFullScan { - req.MVCCFilter = roachpb.MVCCFilter_Latest - req.StartTime = hlc.Timestamp{} + RequestHeader: roachpb.RequestHeaderFromSpan(span), + MVCCFilter: roachpb.MVCCFilter_Latest, + ReturnSST: true, + OmitChecksum: true, } stopwatchStart := timeutil.Now() exported, pErr := client.SendWrappedWith(ctx, sender, header, req) exportDuration := timeutil.Since(stopwatchStart) if log.V(2) { - log.Infof(ctx, `finished ExportRequest %s over (%s,%s] took %s`, - span, start, end, exportDuration) + log.Infof(ctx, `finished ExportRequest %s at %s took %s`, + span, ts, exportDuration) } slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV) if exportDuration > slowExportThreshold { - log.Infof(ctx, "finished ExportRequest %s over (%s,%s] took %s behind by %s", - span, start, end, exportDuration, timeutil.Since(end.GoTime())) + log.Infof(ctx, "finished ExportRequest %s at %s took %s behind by %s", + span, ts, exportDuration, timeutil.Since(ts.GoTime())) } if pErr != nil { @@ -505,17 +420,14 @@ func (p *poller) exportSpan( // When outputting a full scan, we want to use the schema at the scan // timestamp, not the schema at the value timestamp. - var schemaTimestamp hlc.Timestamp - if isFullScan { - schemaTimestamp = end - } + schemaTimestamp := ts stopwatchStart = timeutil.Now() for _, file := range exported.(*roachpb.ExportResponse).Files { if err := p.slurpSST(ctx, file.SST, schemaTimestamp); err != nil { return err } } - if err := p.buf.AddResolved(ctx, span, end); err != nil { + if err := p.buf.AddResolved(ctx, span, ts); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/validations_test.go b/pkg/ccl/changefeedccl/validations_test.go index 4e3738ee7dae..6b5ab7b57afd 100644 --- a/pkg/ccl/changefeedccl/validations_test.go +++ b/pkg/ccl/changefeedccl/validations_test.go @@ -14,11 +14,9 @@ import ( "math/rand" "sync/atomic" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/workload/bank" @@ -26,94 +24,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestValidations(t *testing.T) { - defer leaktest.AfterTest(t)() - defer utilccl.TestingEnableEnterprise()() - - testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - // Poller-based tests get checkpoints every 10 milliseconds, whereas - // rangefeed tests get on at most every 200 milliseconds, and there is - // also a short lead time for rangefeed-based feeds before the - // first checkpoint. This means that a much larger number of transfers - // happens, and the validator gets overwhelmed by fingerprints. - slowWrite := PushEnabled.Get(&f.Server().ClusterSettings().SV) - sqlDB := sqlutils.MakeSQLRunner(db) - - t.Run("bank", func(t *testing.T) { - ctx := context.Background() - const numRows, numRanges, payloadBytes, maxTransfer = 10, 10, 10, 999 - gen := bank.FromConfig(numRows, payloadBytes, numRanges) - if _, err := workloadsql.Setup(ctx, db, gen, 0, 0); err != nil { - t.Fatal(err) - } - - bankFeed := feed(t, f, `CREATE CHANGEFEED FOR bank WITH updated, resolved`) - defer closeFeed(t, bankFeed) - - var done int64 - g := ctxgroup.WithContext(ctx) - g.GoCtx(func(ctx context.Context) error { - for { - if atomic.LoadInt64(&done) > 0 { - return nil - } - - if err := randomBankTransfer(numRows, maxTransfer, db); err != nil { - return err - } - - if slowWrite { - time.Sleep(100 * time.Millisecond) - } - } - }) - - const requestedResolved = 7 - sqlDB.Exec(t, `CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)`) - fprintV, err := cdctest.NewFingerprintValidator(db, `bank`, `fprint`, bankFeed.Partitions()) - require.NoError(t, err) - v := cdctest.MakeCountValidator(cdctest.Validators{ - cdctest.NewOrderValidator(`bank`), - fprintV, - }) - for { - m, err := bankFeed.Next() - if err != nil { - t.Fatal(err) - } else if len(m.Key) > 0 || len(m.Value) > 0 { - updated, _, err := cdctest.ParseJSONValueTimestamps(m.Value) - if err != nil { - t.Fatal(err) - } - v.NoteRow(m.Partition, string(m.Key), string(m.Value), updated) - } else if m.Resolved != nil { - _, resolved, err := cdctest.ParseJSONValueTimestamps(m.Resolved) - if err != nil { - t.Fatal(err) - } - if err := v.NoteResolved(m.Partition, resolved); err != nil { - t.Fatal(err) - } - if v.NumResolvedWithRows > requestedResolved { - atomic.StoreInt64(&done, 1) - break - } - } - } - for _, f := range v.Failures() { - t.Error(f) - } - - if err := g.Wait(); err != nil { - t.Errorf(`%+v`, err) - } - }) - } - t.Run(`sinkless`, sinklessTest(testFn)) - t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) -} - func TestCatchupScanOrdering(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() @@ -184,7 +94,6 @@ func TestCatchupScanOrdering(t *testing.T) { } t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // TODO(dan): This bit is copied from the bank workload. It's diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 62892ba66dd6..aa7437dc6055 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -55,6 +55,12 @@ type cdcTestArgs struct { } func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { + // Skip the poller test on v19.2. After 19.2 is out, we should likely delete + // the test entirely. + if !args.rangefeed && t.registry.buildVersion.Compare(version.MustParse(`v19.1.0-0`)) > 0 { + t.Skip("no poller in >= v19.2.0", "") + } + crdbNodes := c.Range(1, c.nodes-1) workloadNode := c.Node(c.nodes) kafkaNode := c.Node(c.nodes) diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 1d8feed8e742..6f5f6c303749 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -37,13 +37,14 @@ var retiredSettings = map[string]struct{}{ // removed as of 2.1. "kv.allocator.stat_based_rebalancing.enabled": {}, "kv.allocator.stat_rebalance_threshold": {}, - // removed as of 2.2. + // removed as of 19.1. "kv.raft_log.synchronize": {}, // removed as of 19.2. "schemachanger.bulk_index_backfill.enabled": {}, "rocksdb.ingest_backpressure.delay_l0_file": {}, // never used "server.heap_profile.system_memory_threshold_fraction": {}, "timeseries.storage.10s_resolution_ttl": {}, + "changefeed.push.enabled": {}, } // Register adds a setting to the registry.