diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 02cb833d065c..730712dd8d35 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -2,7 +2,6 @@ SettingTypeDefaultDescription changefeed.experimental_poll_intervalduration1spolling interval for the prototype changefeed implementation (WARNING: may compromise cluster stability or correctness; do not edit without supervision) -changefeed.push.enabledbooleantrueif set, changed are pushed instead of pulled. This requires the kv.rangefeed.enabled setting. See https://www.cockroachlabs.com/docs/v19.2/change-data-capture.html#enable-rangefeeds-to-reduce-latency cloudstorage.gs.default.keystringif set, JSON key to use during Google Cloud Storage operations cloudstorage.http.custom_castringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeoutduration10m0sthe timeout for import/export storage operations diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 697ba66d54ee..c4021651df86 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -42,6 +42,15 @@ func BenchmarkChangefeedTicks(b *testing.B) { defer leaktest.AfterTest(b)() defer log.Scope(b).Close(b) + // In PR #38211, we removed the polling based data watcher in changefeeds in + // favor of RangeFeed. This benchmark worked by writing a bunch of data at + // certain timestamps and manipulating clocks at runtime so the polling + // grabbed a little of it at a time. There's fundamentally no way for this to + // work with RangeFeed without a rewrite, but it's not being used for anything + // right now, so the rewrite isn't worth it. We should fix this if we need to + // start doing changefeed perf work at some point. + b.Skip(`broken in #38211`) + ctx := context.Background() s, sqlDBRaw, _ := serverutils.StartServer(b, base.TestServerArgs{UseDatabase: "d"}) defer s.Stopper().Stop(ctx) @@ -212,7 +221,7 @@ func createBenchmarkChangefeed( s.ClusterSettings(), details, spans, encoder, sink, rowsFn, TestingKnobs{}, metrics) ctx, cancel := context.WithCancel(ctx) - go func() { _ = poller.Run(ctx) }() + go func() { _ = poller.RunUsingRangefeeds(ctx) }() go func() { _ = thUpdater.PollTableDescs(ctx) }() errCh := make(chan error, 1) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 511e6f0fe9c2..1c6fc9370545 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -13,7 +13,6 @@ import ( gosql "database/sql" "math/rand" "strings" - "time" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -43,17 +42,9 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB) (Validator, error) { ctx := context.Background() rng, _ := randutil.NewPseudoRand() - var usingRangeFeed bool - if err := db.QueryRow( - `SHOW CLUSTER SETTING changefeed.push.enabled`, - ).Scan(&usingRangeFeed); err != nil { - return nil, err - } - ns := &nemeses{ - rowCount: 4, - db: db, - usingPoller: !usingRangeFeed, + rowCount: 4, + db: db, // eventMix does not have to add to 100 eventMix: map[fsm.Event]int{ // eventTransact opens an UPSERT transaction is there is not one open. If @@ -172,10 +163,9 @@ const ( ) type nemeses struct { - rowCount int - eventMix map[fsm.Event]int - mixTotal int - usingPoller bool + rowCount int + eventMix map[fsm.Event]int + mixTotal int v *CountValidator db *gosql.DB @@ -377,28 +367,6 @@ func transact(a fsm.Args) error { func noteFeedMessage(a fsm.Args) error { ns := a.Extended.(*nemeses) - // The poller works by continually selecting a timestamp to be the next - // high-water and polling for changes between the last high-water and the new - // one. It doesn't push any unresolved intents (it would enter the txnwaitq, - // which would see the txn as live and hence not try to push it), so if we - // have an open transaction, it's possible that the poller is stuck waiting on - // it to resolve, which would cause the below call to `Next` to deadlock. This - // breaks that deadlock. - if ns.usingPoller { - nextDone := make(chan struct{}) - defer close(nextDone) - go func() { - select { - case <-time.After(5 * time.Second): - log.Info(a.Ctx, "pushed open txn to break deadlock") - if err := push(a); err != nil { - panic(err) - } - case <-nextDone: - } - }() - } - m, err := ns.f.Next() if err != nil { return err diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 74e2599eded4..93b667814ffe 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -12,7 +12,6 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -37,17 +36,6 @@ var changefeedPollInterval = func() *settings.DurationSetting { return s }() -// PushEnabled is a cluster setting that triggers all subsequently -// created/unpaused changefeeds to receive kv changes via RangeFeed push -// (instead of ExportRequest polling). -var PushEnabled = settings.RegisterBoolSetting( - "changefeed.push.enabled", - "if set, changed are pushed instead of pulled. This requires the "+ - "kv.rangefeed.enabled setting. See "+ - base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`), - true, -) - const ( jsonMetaSentinel = `__crdb__` ) diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 5c2ac0f15ccf..74cb196b106f 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/distsqlplan" @@ -72,13 +71,6 @@ func distChangefeedFlow( return err } - execCfg := phs.ExecCfg() - if PushEnabled.Get(&execCfg.Settings.SV) { - telemetry.Count(`changefeed.run.push.enabled`) - } else { - telemetry.Count(`changefeed.run.push.disabled`) - } - spansTS := details.StatementTime var initialHighWater hlc.Timestamp if h := progress.GetHighWater(); h != nil && *h != (hlc.Timestamp{}) { @@ -88,6 +80,7 @@ func distChangefeedFlow( spansTS = initialHighWater } + execCfg := phs.ExecCfg() trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, spansTS) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 6bd89072f7a5..4913042bfc74 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -190,12 +190,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { ca.pollerDoneCh = make(chan struct{}) if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) { defer close(ca.pollerDoneCh) - var err error - if PushEnabled.Get(&ca.flowCtx.Settings.SV) { - err = ca.poller.RunUsingRangefeeds(ctx) - } else { - err = ca.poller.Run(ctx) - } + err := ca.poller.RunUsingRangefeeds(ctx) // Trying to call MoveToDraining here is racy (`MoveToDraining called in // state stateTrailingMeta`), so return the error via a channel. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9652cd0f4253..639439a4feb3 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -82,7 +82,6 @@ func TestChangefeedBasics(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) // NB running TestChangefeedBasics, which includes a DELETE, with @@ -126,7 +125,6 @@ func TestChangefeedEnvelope(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedMultiTable(t *testing.T) { @@ -150,7 +148,6 @@ func TestChangefeedMultiTable(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedCursor(t *testing.T) { @@ -209,7 +206,6 @@ func TestChangefeedCursor(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedTimestamps(t *testing.T) { @@ -270,9 +266,6 @@ func TestChangefeedTimestamps(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - // Most poller tests use sinklessTest, but this one uses enterpriseTest - // because we get an extra assertion out of it. - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } func TestChangefeedResolvedFrequency(t *testing.T) { @@ -303,7 +296,6 @@ func TestChangefeedResolvedFrequency(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // Test how Changefeeds react to schema changes that do not require a backfill @@ -470,7 +462,6 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedSchemaChangeNoAllowBackfill(t *testing.T) { @@ -540,7 +531,6 @@ func TestChangefeedSchemaChangeNoAllowBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // Test schema changes that require a backfill when the backfill option is @@ -673,7 +663,6 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // Regression test for #34314 @@ -696,7 +685,6 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedInterleaved(t *testing.T) { @@ -747,7 +735,6 @@ func TestChangefeedInterleaved(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedColumnFamily(t *testing.T) { @@ -781,7 +768,6 @@ func TestChangefeedColumnFamily(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedComputedColumn(t *testing.T) { @@ -810,7 +796,6 @@ func TestChangefeedComputedColumn(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedUpdatePrimaryKey(t *testing.T) { @@ -843,7 +828,6 @@ func TestChangefeedUpdatePrimaryKey(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedTruncateRenameDrop(t *testing.T) { @@ -896,7 +880,6 @@ func TestChangefeedTruncateRenameDrop(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedMonitoring(t *testing.T) { @@ -913,9 +896,6 @@ func TestChangefeedMonitoring(t *testing.T) { } sqlDB := sqlutils.MakeSQLRunner(db) - var usingRangeFeed bool - sqlDB.QueryRow(t, `SHOW CLUSTER SETTING changefeed.push.enabled`, &usingRangeFeed) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) @@ -967,14 +947,11 @@ func TestChangefeedMonitoring(t *testing.T) { if c := s.MustGetSQLCounter(`changefeed.max_behind_nanos`); c <= 0 { return errors.Errorf(`expected > 0 got %d`, c) } - if usingRangeFeed { - // Only RangeFeed-based changefeeds use this buffer. - if c := s.MustGetSQLCounter(`changefeed.buffer_entries.in`); c <= 0 { - return errors.Errorf(`expected > 0 got %d`, c) - } - if c := s.MustGetSQLCounter(`changefeed.buffer_entries.out`); c <= 0 { - return errors.Errorf(`expected > 0 got %d`, c) - } + if c := s.MustGetSQLCounter(`changefeed.buffer_entries.in`); c <= 0 { + return errors.Errorf(`expected > 0 got %d`, c) + } + if c := s.MustGetSQLCounter(`changefeed.buffer_entries.out`); c <= 0 { + return errors.Errorf(`expected > 0 got %d`, c) } return nil }) @@ -983,7 +960,8 @@ func TestChangefeedMonitoring(t *testing.T) { sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) const expectedLatency = 100 * time.Millisecond sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = $1`, - (expectedLatency / 10).String()) + (expectedLatency / 3).String()) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = 1.0`) testutils.SucceedsSoon(t, func() error { waitForBehindNanos := 2 * expectedLatency.Nanoseconds() @@ -995,6 +973,10 @@ func TestChangefeedMonitoring(t *testing.T) { }) // Unblocking the emit should bring the max_behind_nanos back down. + // Unfortunately, this is sensitive to how many closed timestamp updates are + // received. If we get them too fast, it takes longer to process them then + // they come in and we fall continually further behind. The target_duration + // and close_fraction settings above are tuned to try to avoid this. close(beforeEmitRowCh) _, _ = foo.Next() testutils.SucceedsSoon(t, func() error { @@ -1037,7 +1019,6 @@ func TestChangefeedMonitoring(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedRetryableError(t *testing.T) { @@ -1108,7 +1089,6 @@ func TestChangefeedRetryableError(t *testing.T) { // Only the enterprise version uses jobs. t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } // TestChangefeedDataTTL ensures that changefeeds fail with an error in the case @@ -1194,7 +1174,6 @@ func TestChangefeedDataTTL(t *testing.T) { t.Run("sinkless", sinklessTest(testFn)) t.Run("enterprise", enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case @@ -1271,7 +1250,6 @@ func TestChangefeedSchemaTTL(t *testing.T) { t.Run("sinkless", sinklessTest(testFn)) t.Run("enterprise", enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedErrors(t *testing.T) { @@ -1500,7 +1478,6 @@ func TestChangefeedPermissions(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedDescription(t *testing.T) { @@ -1541,7 +1518,6 @@ func TestChangefeedDescription(t *testing.T) { // Only the enterprise version uses jobs. t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } func TestChangefeedPauseUnpause(t *testing.T) { defer leaktest.AfterTest(t)() @@ -1584,7 +1560,6 @@ func TestChangefeedPauseUnpause(t *testing.T) { // Only the enterprise version uses jobs. t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(enterpriseTest, testFn)) } func TestManyChangefeedsOneTable(t *testing.T) { @@ -1637,7 +1612,6 @@ func TestManyChangefeedsOneTable(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestUnspecifiedPrimaryKey(t *testing.T) { @@ -1663,7 +1637,6 @@ func TestUnspecifiedPrimaryKey(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // TestChangefeedNodeShutdown ensures that an enterprise changefeed continues @@ -1771,24 +1744,15 @@ func TestChangefeedTelemetry(t *testing.T) { expectedSink = `experimental-sql` } - var expectedPushEnabled string - if strings.Contains(t.Name(), `sinkless`) { - expectedPushEnabled = `enabled` - } else { - expectedPushEnabled = `disabled` - } - counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) require.Equal(t, int32(2), counts[`changefeed.create.sink.`+expectedSink]) require.Equal(t, int32(2), counts[`changefeed.create.format.json`]) require.Equal(t, int32(1), counts[`changefeed.create.num_tables.1`]) require.Equal(t, int32(1), counts[`changefeed.create.num_tables.2`]) - require.Equal(t, int32(2), counts[`changefeed.run.push.`+expectedPushEnabled]) } t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestChangefeedMemBufferCapacity(t *testing.T) { @@ -1818,11 +1782,6 @@ func TestChangefeedMemBufferCapacity(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'small')`) - // Force rangefeed, even for the enterprise test. - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.push.enabled = true`) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) defer closeFeed(t, foo) diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index ae5c0fa08e11..454d2ffbfc95 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -320,7 +320,6 @@ func TestAvroEncoder(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestAvroMigrateToUnsupportedColumn(t *testing.T) { @@ -351,7 +350,6 @@ func TestAvroMigrateToUnsupportedColumn(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } func TestAvroLedger(t *testing.T) { @@ -389,5 +387,4 @@ func TestAvroLedger(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index f9c0ee8f8352..9c14d281f4c0 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -254,10 +254,8 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) }) defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) - // TODO(dan): Switch this to RangeFeed, too. It seems wasteful right now - // because the RangeFeed version of the tests take longer due to - // closed_timestamp.target_duration's interaction with schema changes. - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.push.enabled = false`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) sqlDB.Exec(t, `CREATE DATABASE d`) sink, cleanup := sqlutils.PGUrl(t, s.ServingAddr(), t.Name(), url.User(security.RootUser)) @@ -268,20 +266,6 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) } } -func pollerTest( - metaTestFn func(func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T), - testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), -) func(*testing.T) { - return func(t *testing.T) { - metaTestFn(func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.push.enabled = false`) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) - testFn(t, db, f) - })(t) - } -} - func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { @@ -312,6 +296,7 @@ func cloudStorageTest( sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) sqlDB.Exec(t, `CREATE DATABASE d`) f := cdctest.MakeCloudFeedFactory(s, db, dir, flushCh) diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 95269457f6d7..25b3238ba658 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -34,6 +34,5 @@ func TestChangefeedNemeses(t *testing.T) { } t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) } diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 22ae0d91ec51..461b496e0785 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -113,83 +113,6 @@ func makePoller( return p } -// Run repeatedly polls and inserts changed kvs and resolved timestamps into a -// buffer. It blocks forever and is intended to be run in a goroutine. -// -// During each poll, a new high-water mark is chosen. The relevant spans for the -// configured tables are broken up by (possibly stale) range boundaries and -// every changed KV between the old and new high-water is fetched via -// ExportRequests. It backpressures sending the requests such that some maximum -// number are inflight or being inserted into the buffer. Finally, after each -// poll completes, a resolved timestamp notification is added to the buffer. -func (p *poller) Run(ctx context.Context) error { - for { - // Wait for polling interval - p.mu.Lock() - lastHighwater := p.mu.highWater - p.mu.Unlock() - - pollDuration := changefeedPollInterval.Get(&p.settings.SV) - pollDuration = pollDuration - timeutil.Since(timeutil.Unix(0, lastHighwater.WallTime)) - if pollDuration > 0 { - log.VEventf(ctx, 1, `sleeping for %s`, pollDuration) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(pollDuration): - } - } - - nextHighWater := p.clock.Now() - - tableMetadataStart := timeutil.Now() - // Ingest table descriptors up to the next prospective highwater. - if err := p.updateTableHistory(ctx, nextHighWater); err != nil { - return err - } - p.metrics.TableMetadataNanos.Inc(timeutil.Since(tableMetadataStart).Nanoseconds()) - - // Determine if we are at a scanBoundary, and trigger a full scan if needed. - isFullScan := false - p.mu.Lock() - if len(p.mu.scanBoundaries) > 0 { - if p.mu.scanBoundaries[0].Equal(lastHighwater) { - // Perform a full scan of the latest value of all keys as of the - // boundary timestamp and consume the boundary. - isFullScan = true - nextHighWater = lastHighwater - p.mu.scanBoundaries = p.mu.scanBoundaries[1:] - } else if p.mu.scanBoundaries[0].Less(nextHighWater) { - // If we aren't currently at a scan boundary, but the next highwater - // would bring us past the scan boundary, set nextHighWater to the - // scan boundary. This will cause us to capture all changes up to the - // scan boundary, then consume the boundary on the next iteration. - nextHighWater = p.mu.scanBoundaries[0] - } - } - p.mu.Unlock() - - if !isFullScan { - log.VEventf(ctx, 1, `changefeed poll (%s,%s]: %s`, - lastHighwater, nextHighWater, time.Duration(nextHighWater.WallTime-lastHighwater.WallTime)) - } else { - log.VEventf(ctx, 1, `changefeed poll full scan @ %s`, nextHighWater) - } - - spans, err := getSpansToProcess(ctx, p.db, p.spans) - if err != nil { - return err - } - - if err := p.exportSpansParallel(ctx, spans, lastHighwater, nextHighWater, isFullScan); err != nil { - return err - } - p.mu.Lock() - p.mu.highWater = nextHighWater - p.mu.Unlock() - } -} - // RunUsingRangeFeeds performs the same task as the normal Run method, but uses // the experimental Rangefeed system to capture changes rather than the // poll-and-export method. Note @@ -230,9 +153,9 @@ func (p *poller) rangefeedImpl(ctx context.Context) error { } p.mu.Unlock() if scanTime != (hlc.Timestamp{}) { - if err := p.exportSpansParallel( - ctx, spans, scanTime, scanTime, true, /* fullScan */ - ); err != nil { + // TODO(dan): Now that we no longer have the poller, we should stop using + // ExportRequest and start using normal Scans. + if err := p.exportSpansParallel(ctx, spans, scanTime); err != nil { return err } } @@ -423,7 +346,7 @@ func getSpansToProcess( } func (p *poller) exportSpansParallel( - ctx context.Context, spans []roachpb.Span, start, end hlc.Timestamp, isFullScan bool, + ctx context.Context, spans []roachpb.Span, ts hlc.Timestamp, ) error { // Export requests for the various watched spans are executed in parallel, // with a semaphore-enforced limit based on a cluster setting. @@ -448,7 +371,7 @@ func (p *poller) exportSpansParallel( g.GoCtx(func(ctx context.Context) error { defer func() { <-exportsSem }() - err := p.exportSpan(ctx, span, start, end, isFullScan) + err := p.exportSpan(ctx, span, ts) finished := atomic.AddInt64(&atomicFinished, 1) if log.V(2) { log.Infof(ctx, `exported %d of %d`, finished, len(spans)) @@ -462,39 +385,31 @@ func (p *poller) exportSpansParallel( return g.Wait() } -func (p *poller) exportSpan( - ctx context.Context, span roachpb.Span, start, end hlc.Timestamp, isFullScan bool, -) error { +func (p *poller) exportSpan(ctx context.Context, span roachpb.Span, ts hlc.Timestamp) error { sender := p.db.NonTransactionalSender() if log.V(2) { - log.Infof(ctx, `sending ExportRequest %s over (%s,%s]`, span, start, end) + log.Infof(ctx, `sending ExportRequest %s at %s`, span, ts) } - header := roachpb.Header{Timestamp: end} + header := roachpb.Header{Timestamp: ts} req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span), - StartTime: start, - MVCCFilter: roachpb.MVCCFilter_All, - ReturnSST: true, - OmitChecksum: true, - EnableTimeBoundIteratorOptimization: true, - } - if isFullScan { - req.MVCCFilter = roachpb.MVCCFilter_Latest - req.StartTime = hlc.Timestamp{} + RequestHeader: roachpb.RequestHeaderFromSpan(span), + MVCCFilter: roachpb.MVCCFilter_Latest, + ReturnSST: true, + OmitChecksum: true, } stopwatchStart := timeutil.Now() exported, pErr := client.SendWrappedWith(ctx, sender, header, req) exportDuration := timeutil.Since(stopwatchStart) if log.V(2) { - log.Infof(ctx, `finished ExportRequest %s over (%s,%s] took %s`, - span, start, end, exportDuration) + log.Infof(ctx, `finished ExportRequest %s at %s took %s`, + span, ts, exportDuration) } slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV) if exportDuration > slowExportThreshold { - log.Infof(ctx, "finished ExportRequest %s over (%s,%s] took %s behind by %s", - span, start, end, exportDuration, timeutil.Since(end.GoTime())) + log.Infof(ctx, "finished ExportRequest %s at %s took %s behind by %s", + span, ts, exportDuration, timeutil.Since(ts.GoTime())) } if pErr != nil { @@ -505,17 +420,14 @@ func (p *poller) exportSpan( // When outputting a full scan, we want to use the schema at the scan // timestamp, not the schema at the value timestamp. - var schemaTimestamp hlc.Timestamp - if isFullScan { - schemaTimestamp = end - } + schemaTimestamp := ts stopwatchStart = timeutil.Now() for _, file := range exported.(*roachpb.ExportResponse).Files { if err := p.slurpSST(ctx, file.SST, schemaTimestamp); err != nil { return err } } - if err := p.buf.AddResolved(ctx, span, end); err != nil { + if err := p.buf.AddResolved(ctx, span, ts); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/validations_test.go b/pkg/ccl/changefeedccl/validations_test.go index 4e3738ee7dae..6b5ab7b57afd 100644 --- a/pkg/ccl/changefeedccl/validations_test.go +++ b/pkg/ccl/changefeedccl/validations_test.go @@ -14,11 +14,9 @@ import ( "math/rand" "sync/atomic" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/workload/bank" @@ -26,94 +24,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestValidations(t *testing.T) { - defer leaktest.AfterTest(t)() - defer utilccl.TestingEnableEnterprise()() - - testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - // Poller-based tests get checkpoints every 10 milliseconds, whereas - // rangefeed tests get on at most every 200 milliseconds, and there is - // also a short lead time for rangefeed-based feeds before the - // first checkpoint. This means that a much larger number of transfers - // happens, and the validator gets overwhelmed by fingerprints. - slowWrite := PushEnabled.Get(&f.Server().ClusterSettings().SV) - sqlDB := sqlutils.MakeSQLRunner(db) - - t.Run("bank", func(t *testing.T) { - ctx := context.Background() - const numRows, numRanges, payloadBytes, maxTransfer = 10, 10, 10, 999 - gen := bank.FromConfig(numRows, payloadBytes, numRanges) - if _, err := workloadsql.Setup(ctx, db, gen, 0, 0); err != nil { - t.Fatal(err) - } - - bankFeed := feed(t, f, `CREATE CHANGEFEED FOR bank WITH updated, resolved`) - defer closeFeed(t, bankFeed) - - var done int64 - g := ctxgroup.WithContext(ctx) - g.GoCtx(func(ctx context.Context) error { - for { - if atomic.LoadInt64(&done) > 0 { - return nil - } - - if err := randomBankTransfer(numRows, maxTransfer, db); err != nil { - return err - } - - if slowWrite { - time.Sleep(100 * time.Millisecond) - } - } - }) - - const requestedResolved = 7 - sqlDB.Exec(t, `CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)`) - fprintV, err := cdctest.NewFingerprintValidator(db, `bank`, `fprint`, bankFeed.Partitions()) - require.NoError(t, err) - v := cdctest.MakeCountValidator(cdctest.Validators{ - cdctest.NewOrderValidator(`bank`), - fprintV, - }) - for { - m, err := bankFeed.Next() - if err != nil { - t.Fatal(err) - } else if len(m.Key) > 0 || len(m.Value) > 0 { - updated, _, err := cdctest.ParseJSONValueTimestamps(m.Value) - if err != nil { - t.Fatal(err) - } - v.NoteRow(m.Partition, string(m.Key), string(m.Value), updated) - } else if m.Resolved != nil { - _, resolved, err := cdctest.ParseJSONValueTimestamps(m.Resolved) - if err != nil { - t.Fatal(err) - } - if err := v.NoteResolved(m.Partition, resolved); err != nil { - t.Fatal(err) - } - if v.NumResolvedWithRows > requestedResolved { - atomic.StoreInt64(&done, 1) - break - } - } - } - for _, f := range v.Failures() { - t.Error(f) - } - - if err := g.Wait(); err != nil { - t.Errorf(`%+v`, err) - } - }) - } - t.Run(`sinkless`, sinklessTest(testFn)) - t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) -} - func TestCatchupScanOrdering(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() @@ -184,7 +94,6 @@ func TestCatchupScanOrdering(t *testing.T) { } t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) - t.Run(`poller`, pollerTest(sinklessTest, testFn)) } // TODO(dan): This bit is copied from the bank workload. It's diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 62892ba66dd6..aa7437dc6055 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -55,6 +55,12 @@ type cdcTestArgs struct { } func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { + // Skip the poller test on v19.2. After 19.2 is out, we should likely delete + // the test entirely. + if !args.rangefeed && t.registry.buildVersion.Compare(version.MustParse(`v19.1.0-0`)) > 0 { + t.Skip("no poller in >= v19.2.0", "") + } + crdbNodes := c.Range(1, c.nodes-1) workloadNode := c.Node(c.nodes) kafkaNode := c.Node(c.nodes) diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 1d8feed8e742..6f5f6c303749 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -37,13 +37,14 @@ var retiredSettings = map[string]struct{}{ // removed as of 2.1. "kv.allocator.stat_based_rebalancing.enabled": {}, "kv.allocator.stat_rebalance_threshold": {}, - // removed as of 2.2. + // removed as of 19.1. "kv.raft_log.synchronize": {}, // removed as of 19.2. "schemachanger.bulk_index_backfill.enabled": {}, "rocksdb.ingest_backpressure.delay_l0_file": {}, // never used "server.heap_profile.system_memory_threshold_fraction": {}, "timeseries.storage.10s_resolution_ttl": {}, + "changefeed.push.enabled": {}, } // Register adds a setting to the registry. diff --git a/pkg/sql/distsqlrun/index_skip_table_reader.go b/pkg/sql/distsqlrun/index_skip_table_reader.go index 1ad79fa6f96e..3a33c63f914a 100644 --- a/pkg/sql/distsqlrun/index_skip_table_reader.go +++ b/pkg/sql/distsqlrun/index_skip_table_reader.go @@ -76,10 +76,8 @@ func newIndexSkipTableReader( // as of now, we don't support interleaved tables, so // error our if there is an index that is interleaved - for _, idx := range spec.Table.AllNonDropIndexes() { - if len(idx.InterleavedBy) > 0 { - return nil, errors.Errorf("Interleaved tables are not supported as of now.") - } + if spec.Table.IsInterleaved() { + return nil, errors.Errorf("Interleaved tables are not supported as of now.") } t := istrPool.Get().(*indexSkipTableReader) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index ad2733128741..5f8a660d2ece 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -531,7 +531,7 @@ func (r *Replica) AdminMerge( if err != nil { return err } - if err := dbRightDescKV.Value.GetProto(&rightDesc); err != nil { + if err := dbRightDescKV.ValueProto(&rightDesc); err != nil { return err }