From f42c0f15d6aac8a91f04dd9115a1de529d31a2c4 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 29 Mar 2023 09:26:29 -0400 Subject: [PATCH 1/5] changefeedccl: Remove skipped tests that decayed over time Remove Fixes #32232 Remove TestChangefeedNodeShutdown. This test has been disabled since 2018; Other tests exist (e.g. `TestChangefeedHandlesDrainingNodes`) that verify restart behavior. Fixes #51842 Remove BenchmarkChangefeedTicks benchmark. This benchmark has been skipped since 2019. Attempts could be made to revive it; however, this benchmark had a lot of code, which accomplished questionable goals. The benchmark itself was unrepresentative (by using dependency injection), too small to be meaningful (1000 rows), and most likely would be too noise and inconclusive. We have added other micro benchmarks over time; and we conduct large scale testing, including with roachtests. Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 5 - pkg/ccl/changefeedccl/bench_test.go | 381 ----------------------- pkg/ccl/changefeedccl/changefeed_test.go | 61 ---- 3 files changed, 447 deletions(-) delete mode 100644 pkg/ccl/changefeedccl/bench_test.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5fa136eff7a3..fbc127d33685 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -175,7 +175,6 @@ go_test( srcs = [ "alter_changefeed_test.go", "avro_test.go", - "bench_test.go", "changefeed_test.go", "csv_test.go", "encoder_test.go", @@ -208,8 +207,6 @@ go_test( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/changefeedpb", "//pkg/ccl/changefeedccl/kvevent", - "//pkg/ccl/changefeedccl/kvfeed", - "//pkg/ccl/changefeedccl/schemafeed", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/multiregionccl", "//pkg/ccl/multiregionccl/multiregionccltestutils", @@ -218,7 +215,6 @@ go_test( "//pkg/ccl/utilccl", "//pkg/cloud", "//pkg/cloud/impl:cloudimpl", - "//pkg/gossip", "//pkg/internal/sqlsmith", "//pkg/jobs", "//pkg/jobs/jobspb", @@ -301,7 +297,6 @@ go_test( "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", "//pkg/util/uuid", - "//pkg/workload", "//pkg/workload/bank", "//pkg/workload/ledger", "//pkg/workload/workloadsql", diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go deleted file mode 100644 index 097765b8e35a..000000000000 --- a/pkg/ccl/changefeedccl/bench_test.go +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright 2018 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package changefeedccl - -import ( - "bytes" - "context" - gosql "database/sql" - "fmt" - "math" - "sync" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" - "github.com/cockroachdb/cockroach/pkg/gossip" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/span" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/workload" - "github.com/cockroachdb/cockroach/pkg/workload/bank" - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/require" -) - -func BenchmarkChangefeedTicks(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - - // In PR #38211, we removed the polling based data watcher in changefeeds in - // favor of RangeFeed. This benchmark worked by writing a bunch of data at - // certain timestamps and manipulating clocks at runtime so the polling - // grabbed a little of it at a time. There's fundamentally no way for this to - // work with RangeFeed without a rewrite, but it's not being used for anything - // right now, so the rewrite isn't worth it. We should fix this if we need to - // start doing changefeed perf work at some point. - skip.WithIssue(b, 51842, `broken in #38211`) - - ctx := context.Background() - s, sqlDBRaw, _ := serverutils.StartServer(b, base.TestServerArgs{UseDatabase: "d"}) - defer s.Stopper().Stop(ctx) - sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) - sqlDB.Exec(b, `CREATE DATABASE d`) - sqlDB.Exec(b, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ms'`) - - numRows := 1000 - if testing.Short() { - numRows = 100 - } - bankTable := bank.FromRows(numRows).Tables()[0] - timestamps, _, err := loadWorkloadBatches(sqlDBRaw, bankTable) - if err != nil { - b.Fatal(err) - } - - runBench := func(b *testing.B, feedClock *hlc.Clock) { - var sinkBytes int64 - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StartTimer() - sink, cancelFeed, err := createBenchmarkChangefeed(ctx, s, feedClock, `d`, `bank`) - require.NoError(b, err) - for rows := 0; rows < numRows; { - r, sb := sink.WaitForEmit() - rows += r - sinkBytes += sb - } - b.StopTimer() - if err := cancelFeed(); err != nil { - b.Errorf(`%+v`, err) - } - } - b.SetBytes(sinkBytes / int64(b.N)) - } - - b.Run(`InitialScan`, func(b *testing.B) { - // Use a clock that's immediately larger than any timestamp the data was - // loaded at to catch it all in the initial scan. - runBench(b, s.Clock()) - }) - - b.Run(`SteadyState`, func(b *testing.B) { - // TODO(dan): This advances the clock through the timestamps of the ingested - // data every time it's called, but that's a little unsatisfying. Instead, - // wait for each batch to come out of the feed before advancing the - // timestamp. - feedClock := hlc.NewClockForTesting(&mockClock{ts: timestamps}) - runBench(b, feedClock) - }) -} - -type mockClock struct { - ts []time.Time - nextIdx int -} - -var _ hlc.WallClock = &mockClock{} - -// Now implements the hlc.WallClock interface. -func (m *mockClock) Now() time.Time { - if m.nextIdx < len(m.ts) { - m.nextIdx++ - return m.ts[m.nextIdx-1] - } - return timeutil.Now() -} - -type benchSink struct { - testSink - syncutil.Mutex - cond *sync.Cond - emits int - emitBytes int64 -} - -func makeBenchSink() *benchSink { - s := &benchSink{} - s.cond = sync.NewCond(&s.Mutex) - return s -} - -func (s *benchSink) EmitRow( - ctx context.Context, - topic TopicDescriptor, - key, value []byte, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, -) error { - defer alloc.Release(ctx) - return s.emit(int64(len(key) + len(value))) -} -func (s *benchSink) EmitResolvedTimestamp(ctx context.Context, e Encoder, ts hlc.Timestamp) error { - var noTopic string - p, err := e.EncodeResolvedTimestamp(ctx, noTopic, ts) - if err != nil { - return err - } - return s.emit(int64(len(p))) -} -func (s *benchSink) Flush(_ context.Context) error { return nil } -func (s *benchSink) Close() error { return nil } -func (s *benchSink) Dial() error { return nil } - -func (s *benchSink) emit(bytes int64) error { - s.Lock() - defer s.Unlock() - s.emits++ - s.emitBytes += bytes - s.cond.Broadcast() - return nil -} - -// WaitForEmit blocks until at least one thing is emitted by the sink. It -// returns the number of emitted messages and bytes since the last WaitForEmit. -func (s *benchSink) WaitForEmit() (int, int64) { - s.Lock() - defer s.Unlock() - for s.emits == 0 { - s.cond.Wait() - } - emits, emitBytes := s.emits, s.emitBytes - s.emits, s.emitBytes = 0, 0 - return emits, emitBytes -} - -// createBenchmarkChangefeed starts a stripped down changefeed. It watches -// `database.table` and outputs to `sinkURI`. The given `feedClock` is only used -// for the internal ExportRequest polling, so a benchmark can write data with -// different timestamps beforehand and simulate the changefeed going through -// them in steps. -// -// The returned sink can be used to count emits and the closure handed back -// cancels the changefeed (blocking until it's shut down) and returns an error -// if the changefeed had failed before the closure was called. -// -// This intentionally skips the distsql and sink parts to keep the benchmark -// focused on the core changefeed work. -func createBenchmarkChangefeed( - ctx context.Context, - s serverutils.TestServerInterface, - feedClock *hlc.Clock, - database, table string, -) (*benchSink, func() error, error) { - tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table) - spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)} - details := jobspb.ChangefeedDetails{ - Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{StatementTimeName: tableDesc.GetName()}}, - } - initialHighWater := hlc.Timestamp{} - encodingOpts := changefeedbase.EncodingOptions{Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeRow} - encoder, err := makeJSONEncoder(encodingOpts) - if err != nil { - return nil, nil, err - } - sink := makeBenchSink() - - settings := s.ClusterSettings() - metrics := MakeMetrics(base.DefaultHistogramWindowInterval()).(*Metrics) - buf := kvevent.MakeChanBuffer() - mm := mon.NewUnlimitedMonitor( - context.Background(), "test", mon.MemoryResource, - nil /* curCount */, nil /* maxHist */, math.MaxInt64, settings, - ) - needsInitialScan := initialHighWater.IsEmpty() - if needsInitialScan { - initialHighWater = details.StatementTime - } - kvfeedCfg := kvfeed.Config{ - Settings: settings, - DB: s.DB(), - Clock: feedClock, - Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), - Spans: spans, - Targets: AllTargets(details), - Writer: buf, - Metrics: &metrics.KVFeedMetrics, - MM: mm, - InitialHighWater: initialHighWater, - WithDiff: false, - NeedsInitialScan: needsInitialScan, - SchemaFeed: schemafeed.DoNothingSchemaFeed, - } - - sf, err := span.MakeFrontier(spans...) - if err != nil { - return nil, nil, err - } - execCfg := s.ExecutorConfig().(sql.ExecutorConfig) - eventConsumer, err := newKVEventToRowConsumer(ctx, &execCfg, sf, initialHighWater, - sink, encoder, makeChangefeedConfigFromJobDetails(details), execinfrapb.ChangeAggregatorSpec{}, - TestingKnobs{}, nil, nil, nil) - - if err != nil { - return nil, nil, err - } - tickFn := func(ctx context.Context) (jobspb.ResolvedSpan, error) { - event, err := buf.Get(ctx) - if err != nil { - return jobspb.ResolvedSpan{}, err - } - if event.Type() == kvevent.TypeKV { - if err := eventConsumer.ConsumeEvent(ctx, event); err != nil { - return jobspb.ResolvedSpan{}, err - } - } - return event.Resolved(), nil - } - - ctx, cancel := context.WithCancel(ctx) - go func() { _ = kvfeed.Run(ctx, kvfeedCfg) }() - - errCh := make(chan error, 1) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := func() error { - sf, err := span.MakeFrontier(spans...) - if err != nil { - return err - } - for { - // This is basically the ChangeAggregator processor. - rs, err := tickFn(ctx) - if err != nil { - return err - } - // This is basically the ChangeFrontier processor, the resolved - // spans are normally sent using distsql, so we're missing a bit - // of overhead here. - advanced, err := sf.Forward(rs.Span, rs.Timestamp) - if err != nil { - return err - } - if advanced { - frontier := sf.Frontier() - if err := emitResolvedTimestamp(ctx, encoder, sink, frontier); err != nil { - return err - } - } - } - }() - errCh <- err - }() - cancelFn := func() error { - select { - case err := <-errCh: - return err - default: - } - cancel() - wg.Wait() - return nil - } - return sink, cancelFn, nil -} - -// loadWorkloadBatches inserts a workload.Table's row batches, each in one -// transaction. It returns the timestamps of these transactions and the byte -// size for use with b.SetBytes. -func loadWorkloadBatches(sqlDB *gosql.DB, table workload.Table) ([]time.Time, int64, error) { - if _, err := sqlDB.Exec(`CREATE TABLE "` + table.Name + `" ` + table.Schema); err != nil { - return nil, 0, err - } - - var now time.Time - var timestamps []time.Time - var benchBytes int64 - var numRows int - - var insertStmtBuf bytes.Buffer - var params []interface{} - for batchIdx := 0; batchIdx < table.InitialRows.NumBatches; batchIdx++ { - if _, err := sqlDB.Exec(`BEGIN`); err != nil { - return nil, 0, err - } - - params = params[:0] - insertStmtBuf.Reset() - insertStmtBuf.WriteString(`INSERT INTO "` + table.Name + `" VALUES `) - for _, row := range table.InitialRows.BatchRows(batchIdx) { - numRows++ - if len(params) != 0 { - insertStmtBuf.WriteString(`,`) - } - insertStmtBuf.WriteString(`(`) - for colIdx, datum := range row { - if colIdx != 0 { - insertStmtBuf.WriteString(`,`) - } - benchBytes += workload.ApproxDatumSize(datum) - params = append(params, datum) - fmt.Fprintf(&insertStmtBuf, `$%d`, len(params)) - } - insertStmtBuf.WriteString(`)`) - } - if _, err := sqlDB.Exec(insertStmtBuf.String(), params...); err != nil { - return nil, 0, err - } - - if err := sqlDB.QueryRow(`SELECT transaction_timestamp(); COMMIT;`).Scan(&now); err != nil { - return nil, 0, err - } - timestamps = append(timestamps, now) - } - - var totalRows int - if err := sqlDB.QueryRow( - `SELECT count(*) FROM "` + table.Name + `"`, - ).Scan(&totalRows); err != nil { - return nil, 0, err - } - if numRows != totalRows { - return nil, 0, errors.Errorf(`sanity check failed: expected %d rows got %d`, numRows, totalRows) - } - - return timestamps, benchBytes, nil -} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index a75f6635f598..70409c2500df 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5656,67 +5656,6 @@ func TestUnspecifiedPrimaryKey(t *testing.T) { cdcTest(t, testFn) } -// TestChangefeedNodeShutdown ensures that an enterprise changefeed continues -// running after the original job-coordinator node is shut down. -func TestChangefeedNodeShutdown(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.WithIssue(t, 32232) - - knobs := base.TestingKnobs{ - DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}}, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - - tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - UseDatabase: "d", - Knobs: knobs, - }, - }) - defer tc.Stopper().Stop(context.Background()) - - db := tc.ServerConn(1) - serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", time.Millisecond) - sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `CREATE DATABASE d`) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) - - // Create a factory which uses server 1 as the output of the Sink, but - // executes the CREATE CHANGEFEED statement on server 0. - sink, cleanup := sqlutils.PGUrl( - t, tc.Server(0).ServingSQLAddr(), t.Name(), url.User(username.RootUser)) - defer cleanup() - f := makeTableFeedFactory(tc.Server(1), tc.ServerConn(0), sink) - foo := feed(t, f, "CREATE CHANGEFEED FOR foo") - defer closeFeed(t, foo) - - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'second')`) - assertPayloads(t, foo, []string{ - `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo: [1]->{"after": {"a": 1, "b": "second"}}`, - }) - - // TODO(mrtracy): At this point we need to wait for a resolved timestamp, - // in order to ensure that there isn't a repeat when the job is picked up - // again. As an alternative, we could use a verifier instead of assertPayloads. - - // Wait for the high-water mark on the job to be updated after the initial - // scan, to make sure we don't get the initial scan data again. - - // Stop server 0, which is where the table feed connects. - tc.StopServer(0) - - sqlDB.Exec(t, `UPSERT INTO foo VALUES(0, 'updated')`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (3, 'third')`) - - assertPayloads(t, foo, []string{ - `foo: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo: [3]->{"after": {"a": 3, "b": "third"}}`, - }) -} - func TestChangefeedTelemetry(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From 2b40c5e115fc3f413f33cb3f093a6c09b2879331 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 29 Mar 2023 11:51:37 -0400 Subject: [PATCH 2/5] changefeedccl: Unskip `TestChangefeedJobUpdateFailsIfNotClaimed` test Fixes #91548 Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 2 -- pkg/ccl/changefeedccl/changefeed_test.go | 20 +++++++------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index fbc127d33685..d304ff773ccd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -268,8 +268,6 @@ go_test( "//pkg/sql/sem/volatility", "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", - "//pkg/sql/sqlliveness", - "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/storage", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 70409c2500df..09b05653a3a6 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -74,8 +74,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" @@ -4055,18 +4053,10 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 91548) - // Set TestingKnobs to return a known session for easier // comparison. - testSession := sqllivenesstestutils.NewAlwaysAliveSession("known-test-session") adoptionInterval := 20 * time.Minute sessionOverride := withKnobsFn(func(knobs *base.TestingKnobs) { - knobs.SQLLivenessKnobs = &sqlliveness.TestingKnobs{ - SessionOverride: func(_ context.Context) (sqlliveness.Session, error) { - return testSession, nil - }, - } // This is a hack to avoid the job adoption loop from // immediately re-adopting the job that is running. The job // adoption loop basically just sets the claim ID, which will @@ -4101,6 +4091,10 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { // another node. sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID) + timeout := 5 * time.Second + if util.RaceEnabled { + timeout = 30 * time.Second + } // Expect that the distflow fails since it can't // update the checkpoint. select { @@ -4108,9 +4102,9 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { require.Error(t, err) // TODO(ssd): Replace this error in the jobs system with // an error type we can check against. - require.Contains(t, err.Error(), fmt.Sprintf("expected session \"%s\" but found NULL", testSession.ID().String())) - case <-time.After(5 * time.Second): - t.Fatal("expected distflow to fail but it hasn't after 5 seconds") + require.Regexp(t, "expected session .* but found NULL", err.Error()) + case <-time.After(timeout): + t.Fatal("expected distflow to fail") } } From ccea76ccc62dbfe2b00f8b2cb1be5d598b2b5eeb Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 31 Mar 2023 14:39:38 -0400 Subject: [PATCH 3/5] upgrades: remove migration that waits for schema changes We can also remove some skipped tests, since they no longer apply. Release note: None --- pkg/clusterversion/cockroach_versions.go | 8 - pkg/upgrade/upgrades/BUILD.bazel | 3 - .../upgrades/schemachanger_elements_test.go | 10 + pkg/upgrade/upgrades/upgrades.go | 6 - .../upgrades/wait_for_schema_changes.go | 78 ----- .../upgrades/wait_for_schema_changes_test.go | 322 ------------------ 6 files changed, 10 insertions(+), 417 deletions(-) delete mode 100644 pkg/upgrade/upgrades/wait_for_schema_changes.go delete mode 100644 pkg/upgrade/upgrades/wait_for_schema_changes_test.go diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index c71285fa82c0..d402a77ca565 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -254,10 +254,6 @@ const ( TODODelete_V22_2SetRoleOptionsUserIDColumnNotNull // TODODelete_V22_2RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node. TODODelete_V22_2RangefeedUseOneStreamPerNode - // TODODelete_V22_2NoNonMVCCAddSSTable adds a migration which waits for all - // schema changes to complete. After this point, no non-MVCC - // AddSSTable calls will be used outside of tenant streaming. - TODODelete_V22_2NoNonMVCCAddSSTable // TODODelete_V22_2TTLDistSQL uses DistSQL to distribute TTL SELECT/DELETE statements to // leaseholder nodes. TODODelete_V22_2TTLDistSQL @@ -678,10 +674,6 @@ var rawVersionsSingleton = keyedVersions{ Key: TODODelete_V22_2RangefeedUseOneStreamPerNode, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 60}, }, - { - Key: TODODelete_V22_2NoNonMVCCAddSSTable, - Version: roachpb.Version{Major: 22, Minor: 1, Internal: 62}, - }, { Key: TODODelete_V22_2TTLDistSQL, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 68}, diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 61d215119020..cc8ea8a2f0d6 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -36,7 +36,6 @@ go_library( "tenant_table_migration.go", "upgrades.go", "wait_for_del_range_in_gc_job.go", - "wait_for_schema_changes.go", "web_sessions_table_user_id_migration.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades", @@ -123,7 +122,6 @@ go_test( "system_statistics_activity_test.go", "tenant_table_migration_test.go", "wait_for_del_range_in_gc_job_test.go", - "wait_for_schema_changes_test.go", "web_sessions_table_user_id_migration_test.go", ], args = ["-test.timeout=895s"], @@ -185,7 +183,6 @@ go_test( "//pkg/util/intsets", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/protoutil", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/upgrade/upgrades/schemachanger_elements_test.go b/pkg/upgrade/upgrades/schemachanger_elements_test.go index a4ef9502a815..641ce483213f 100644 --- a/pkg/upgrade/upgrades/schemachanger_elements_test.go +++ b/pkg/upgrade/upgrades/schemachanger_elements_test.go @@ -15,6 +15,7 @@ import ( gosql "database/sql" "sync/atomic" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -203,3 +205,11 @@ func TestUpgradeSchemaChangerElements(t *testing.T) { }) } } + +func shortInterval() *time.Duration { + shortInterval := 10 * time.Millisecond + if util.RaceEnabled { + shortInterval *= 5 + } + return &shortInterval +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 8f8c0d1250a9..88e6794998b7 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -117,12 +117,6 @@ var upgrades = []upgradebase.Upgrade{ upgrade.NoPrecondition, ensureSQLSchemaTelemetrySchedule, ), - upgrade.NewTenantUpgrade( - "wait for all in-flight schema changes", - toCV(clusterversion.TODODelete_V22_2NoNonMVCCAddSSTable), - upgrade.NoPrecondition, - waitForAllSchemaChanges, - ), upgrade.NewTenantUpgrade("fix corrupt user-file related table descriptors", toCV(clusterversion.TODODelete_V22_2FixUserfileRelatedDescriptorCorruption), upgrade.NoPrecondition, diff --git a/pkg/upgrade/upgrades/wait_for_schema_changes.go b/pkg/upgrade/upgrades/wait_for_schema_changes.go deleted file mode 100644 index 04611c67142f..000000000000 --- a/pkg/upgrade/upgrades/wait_for_schema_changes.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package upgrades - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/upgrade" - "github.com/cockroachdb/errors" -) - -// waitForAllSchemaChanges waits for all schema changes to enter a -// terminal or paused state. -// -// Because this is intended for the mvcc-bulk-ops transition, it does -// not care about schema changes created while this migration is -// running because any such schema changes must already be using the -// new mvcc bulk operations -// -// Note that we do not use SHOW JOBS WHEN COMPLETE here to avoid -// blocking forever on PAUSED jobs. Jobs using old index backfills -// will fail on Resume. -func waitForAllSchemaChanges( - ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, -) error { - - initialJobListQuery := fmt.Sprintf(` - -SELECT - job_id -FROM - [SHOW JOBS] -WHERE - job_type = 'SCHEMA CHANGE' - AND status NOT IN ('%s', '%s', '%s', '%s', '%s') -`, - string(jobs.StatusSucceeded), - string(jobs.StatusFailed), - string(jobs.StatusCanceled), - string(jobs.StatusRevertFailed), - string(jobs.StatusPaused)) - rows, err := d.InternalExecutor.QueryBufferedEx(ctx, - "query-non-terminal-schema-changers", - nil, /* txn */ - sessiondata.NodeUserSessionDataOverride, - initialJobListQuery) - if err != nil { - return err - } - - jobList := make([]jobspb.JobID, len(rows)) - for i, datums := range rows { - if len(datums) != 1 { - return errors.AssertionFailedf("unexpected number of columns: %d (expected 1)", len(datums)) - } - d := datums[0] - id, ok := d.(*tree.DInt) - if !ok { - return errors.AssertionFailedf("unexpected type for id column: %T (expected DInt)", d) - } - jobList[i] = jobspb.JobID(*id) - } - return d.JobRegistry.WaitForJobsIgnoringJobErrors(ctx, jobList) -} diff --git a/pkg/upgrade/upgrades/wait_for_schema_changes_test.go b/pkg/upgrade/upgrades/wait_for_schema_changes_test.go deleted file mode 100644 index ec784a33b3f7..000000000000 --- a/pkg/upgrade/upgrades/wait_for_schema_changes_test.go +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package upgrades_test - -import ( - "context" - gosql "database/sql" - "fmt" - "sync/atomic" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/tests" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/require" -) - -func TestWaitForSchemaChangeMigration(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup") - - ctx := context.Background() - testCases := []struct { - name string - setup func(t *testing.T, sqlDB *gosql.DB) error - run func(t *testing.T, sqlDB *gosql.DB) error - }{ - { - "running schema change that succeeds waits for success", - nil, - func(t *testing.T, sqlDB *gosql.DB) error { - _, err := sqlDB.Exec("CREATE INDEX ON t (b)") - return err - }, - }, - { - "running schema change that fails waits for failure", - nil, - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("INSERT INTO t VALUES (1, 1), (2, 1)"); err != nil { - return err - } - - if _, err := sqlDB.Exec("CREATE UNIQUE INDEX ON t (b)"); err == nil { - return errors.New("expected failure to create unique index but error was nil") - } - return nil - }, - }, - { - "running schema change that pauses should not block upgrade", - nil, - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'indexbackfill.before_flow'"); err != nil { - return err - } - if _, err := sqlDB.Exec("CREATE INDEX ON t (b)"); err == nil { - return errors.New("expected failure because of pausepoint but error was nil") - } - return nil - }, - }, - { - "previously successful schema change does block migration", - func(t *testing.T, sqlDB *gosql.DB) error { - _, err := sqlDB.Exec("CREATE UNIQUE INDEX ON t (b)") - return err - }, - nil, - }, - { - "previously failed schema change does block migration", - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("INSERT INTO t VALUES (1, 1), (2, 1)"); err != nil { - return err - } - if _, err := sqlDB.Exec("CREATE UNIQUE INDEX ON t (b)"); err == nil { - return errors.New("expected failure to create unique index but error was nil") - } - return nil - }, - nil, - }, - { - "previously paused schema should not block upgrade", - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'indexbackfill.before_flow'"); err != nil { - return err - } - if _, err := sqlDB.Exec("CREATE INDEX ON t (b)"); err == nil { - return errors.New("expected failure because of pausepoint but error was nil") - } - return nil - }, - nil, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - params, _ := tests.CreateTestServerParams() - params.Knobs.Server = &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.ByKey(clusterversion.TODODelete_V22_2NoNonMVCCAddSSTable - 1), - } - - var ( - scStartedChan chan struct{} - scAllowResumeChan chan struct{} - secondWaitChan chan struct{} - ) - - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - RunBeforeResume: func(_ jobspb.JobID) error { - if scStartedChan != nil { - close(scStartedChan) - } - if scAllowResumeChan != nil { - <-scAllowResumeChan - } - return nil - }, - } - jobKnobs := jobs.NewTestingKnobsWithShortIntervals() - jobKnobs.IntervalOverrides.WaitForJobsInitialDelay = shortInterval() - jobKnobs.IntervalOverrides.WaitForJobsMaxDelay = shortInterval() - - var waitCount int32 - jobKnobs.BeforeWaitForJobsQuery = func(_ []jobspb.JobID) { - if secondWaitChan != nil { - if atomic.AddInt32(&waitCount, 1) == 2 { - close(secondWaitChan) - } - } - } - params.Knobs.JobsTestingKnobs = jobKnobs - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - _, err := sqlDB.Exec("CREATE TABLE t (pk INT PRIMARY KEY, b INT)") - require.NoError(t, err) - if tc.setup != nil { - require.NoError(t, tc.setup(t, sqlDB)) - } - - ctx := context.Background() - g := ctxgroup.WithContext(ctx) - - if tc.run != nil { - scStartedChan = make(chan struct{}) - scAllowResumeChan = make(chan struct{}) - secondWaitChan = make(chan struct{}) - g.GoCtx(func(ctx context.Context) error { - return tc.run(t, sqlDB) - }) - } - g.GoCtx(func(ctx context.Context) error { - if scStartedChan != nil { - <-scStartedChan - } - _, err = sqlDB.Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version()") - return err - }) - if tc.run != nil { - <-scStartedChan - <-secondWaitChan - close(scAllowResumeChan) - } - require.NoError(t, g.Wait()) - }) - } -} - -func TestWaitForSchemaChangeMigrationSynthetic(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup") - - ctx := context.Background() - - upsertJob := func(sqlDB *gosql.DB, typ string, status string) error { - var details jobspb.Details - switch typ { - case "SCHEMA CHANGE": - details = jobspb.SchemaChangeDetails{} - case "AUTO CREATE STATS": - details = jobspb.CreateStatsDetails{Name: "__auto__"} - default: - return errors.Newf("job type not support in this test: %s", typ) - } - - payload, err := protoutil.Marshal(&jobspb.Payload{ - UsernameProto: username.RootUserName().EncodeProto(), - Details: jobspb.WrapPayloadDetails(details), - }) - if err != nil { - return err - } - - _, err = sqlDB.Exec(`UPSERT INTO system.jobs (id, status, payload) VALUES ($1, $2, $3)`, - 1, status, payload, - ) - return err - } - - terminalStates := []jobs.Status{ - jobs.StatusSucceeded, - jobs.StatusFailed, - jobs.StatusCanceled, - jobs.StatusRevertFailed, - jobs.StatusPaused, - } - nonTerminalStates := []jobs.Status{ - jobs.StatusPending, - jobs.StatusRunning, - jobs.StatusReverting, - jobs.StatusCancelRequested, - jobs.StatusPauseRequested, - } - - testMigrate := func(jobType string, startingState string, nextState string) { - name := fmt.Sprintf("%s_%s", jobType, startingState) - if nextState != "" { - name = fmt.Sprintf("%s_%s", name, nextState) - } - - t.Run(name, func(t *testing.T) { - params, _ := tests.CreateTestServerParams() - params.Knobs.Server = &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.ByKey(clusterversion.TODODelete_V22_2NoNonMVCCAddSSTable - 1), - } - - var waitCount int32 - var secondWaitChan chan struct{} - params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{ - BeforeWaitForJobsQuery: func(_ []jobspb.JobID) { - if secondWaitChan != nil { - if atomic.AddInt32(&waitCount, 1) == 2 { - close(secondWaitChan) - } - } - }, - IntervalOverrides: jobs.TestingIntervalOverrides{ - WaitForJobsInitialDelay: shortInterval(), - WaitForJobsMaxDelay: shortInterval(), - }, - } - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - require.NoError(t, upsertJob(sqlDB, jobType, startingState)) - - // This test expects all of the cases will eventually - // pass the migration. If not, we timeout. - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - g := ctxgroup.WithContext(ctx) - if nextState != "" { - secondWaitChan = make(chan struct{}) - } - - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version()") - return err - }) - - if nextState != "" { - <-secondWaitChan - require.NoError(t, upsertJob(sqlDB, jobType, nextState)) - } - require.NoError(t, g.Wait()) - }) - } - - for _, state := range nonTerminalStates { - testMigrate("AUTO CREATE STATS", string(state), "") - - } - for _, state := range terminalStates { - testMigrate("AUTO CREATE STATS", string(state), "") - testMigrate("SCHEMA CHANGE", string(state), "") - } - for _, startingState := range nonTerminalStates { - for _, endingState := range terminalStates { - testMigrate("SCHEMA CHANGE", string(startingState), string(endingState)) - } - } -} - -func shortInterval() *time.Duration { - shortInterval := 10 * time.Millisecond - if util.RaceEnabled { - shortInterval *= 5 - } - return &shortInterval -} From a503765e05dc4cb2633b00258151f7c5b3ae7aba Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 3 Apr 2023 18:13:21 +0200 Subject: [PATCH 4/5] server,testutils: add some extra logging for TestStatusEngineStatsJson Release note: None --- pkg/server/status_test.go | 2 ++ pkg/testutils/serverutils/test_server_shim.go | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 672b420b2a35..130f6185552a 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -277,6 +277,8 @@ func TestStatusEngineStatsJson(t *testing.T) { } defer s.Stopper().Stop(context.Background()) + t.Logf("using admin URL %s", s.AdminURL()) + var engineStats serverpb.EngineStatsResponse // Using SucceedsSoon because we have seen in the wild that // occasionally requests don't go through with error "transport: diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index f464fe338b92..6d1b907b9952 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -515,7 +516,9 @@ func GetJSONProtoWithAdminOption( if err != nil { return err } - return httputil.GetJSON(httpClient, ts.AdminURL()+path, response) + fullURL := ts.AdminURL() + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + return httputil.GetJSON(httpClient, fullURL, response) } // PostJSONProto uses the supplied client to POST the URL specified by the parameters @@ -534,5 +537,7 @@ func PostJSONProtoWithAdminOption( if err != nil { return err } - return httputil.PostJSON(httpClient, ts.AdminURL()+path, request, response) + fullURL := ts.AdminURL() + path + log.Infof(context.Background(), "test retrieving protobuf over HTTP: %s", fullURL) + return httputil.PostJSON(httpClient, fullURL, request, response) } From 2189cb090db8c4c8cb8ec5850135301c01f867eb Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 31 Mar 2023 14:53:04 -0400 Subject: [PATCH 5/5] upgrades: unskip TestIsAtLeastVersionBuiltin Release note: None --- pkg/upgrade/upgrades/builtins_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/upgrade/upgrades/builtins_test.go b/pkg/upgrade/upgrades/builtins_test.go index 7b11f2edb338..a8ed69b3b65c 100644 --- a/pkg/upgrade/upgrades/builtins_test.go +++ b/pkg/upgrade/upgrades/builtins_test.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -28,14 +27,12 @@ func TestIsAtLeastVersionBuiltin(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup") - clusterArgs := base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.ByKey(clusterversion.TODODelete_V22_1), + BinaryVersionOverride: clusterversion.ByKey(clusterversion.V22_2), }, }, }, @@ -49,9 +46,9 @@ func TestIsAtLeastVersionBuiltin(t *testing.T) { ) defer tc.Stopper().Stop(ctx) - v := clusterversion.ByKey(clusterversion.TODODelete_V22_2Start).String() - // Check that the builtin returns false when comparing against 22.1-2 - // version because we are still on 22.1-0. + v := clusterversion.ByKey(clusterversion.V23_1Start).String() + // Check that the builtin returns false when comparing against the new version + // version because we are still on the bootstrap version. sqlDB.CheckQueryResults(t, "SELECT crdb_internal.is_at_least_version('"+v+"')", [][]string{{"false"}}) // Run the upgrade.