diff --git a/pkg/cmd/roachtest/tests/lease_preferences.go b/pkg/cmd/roachtest/tests/lease_preferences.go index 460d1bd801df..9138dc96b7b9 100644 --- a/pkg/cmd/roachtest/tests/lease_preferences.go +++ b/pkg/cmd/roachtest/tests/lease_preferences.go @@ -252,6 +252,14 @@ func runLeasePreferences( // enforcement. require.NoError(t, WaitForReplication(ctx, t, conn, spec.replFactor)) + // Set a lease preference for the liveness range, to be on n5. This test + // would occasionally fail due to the liveness heartbeat failures, when the + // liveness lease is on a stopped node. This is not ideal behavior, #108512. + configureZone(t, ctx, conn, "RANGE liveness", zoneConfig{ + replicas: spec.replFactor, + leaseNode: 5, + }) + t.L().Printf("setting lease preferences: %s", spec.preferences) setLeasePreferences(ctx, spec.preferences) t.L().Printf("waiting for initial lease preference conformance") diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index bd258d07bc35..948fcb892f1d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) // savepoint captures the state in the TxnCoordSender necessary to restore that @@ -121,15 +122,8 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin } sp := s.(*savepoint) - err := tc.checkSavepointLocked(sp) + err := tc.checkSavepointLocked(sp, "rollback to") if err != nil { - if errors.Is(err, errSavepointInvalidAfterTxnRestart) { - err = kvpb.NewTransactionRetryWithProtoRefreshError( - "cannot rollback to savepoint after a transaction restart", - tc.mu.txn.ID, - tc.mu.txn, - ) - } return err } @@ -165,15 +159,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo } sp := s.(*savepoint) - err := tc.checkSavepointLocked(sp) - if errors.Is(err, errSavepointInvalidAfterTxnRestart) { - err = kvpb.NewTransactionRetryWithProtoRefreshError( - "cannot release savepoint after a transaction restart", - tc.mu.txn.ID, - tc.mu.txn, - ) - } - return err + return tc.checkSavepointLocked(sp, "release") } type errSavepointOperationInErrorTxn struct{} @@ -193,23 +179,22 @@ func (tc *TxnCoordSender) assertNotFinalized() error { return nil } -var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart") - // checkSavepointLocked checks whether the provided savepoint is still valid. -// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an +// Returns a TransactionRetryWithProtoRefreshError if the savepoint is not an // "initial" one and the transaction has restarted since the savepoint was // created. -func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error { +func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint, op redact.SafeString) error { // Only savepoints taken before any activity are allowed to be used after a // transaction restart. if s.Initial() { return nil } - if s.txnID != tc.mu.txn.ID { - return errSavepointInvalidAfterTxnRestart - } - if s.epoch != tc.mu.txn.Epoch { - return errSavepointInvalidAfterTxnRestart + if s.txnID != tc.mu.txn.ID || s.epoch != tc.mu.txn.Epoch { + return kvpb.NewTransactionRetryWithProtoRefreshError( + redact.Sprintf("cannot %s savepoint after a transaction restart", op), + s.txnID, + tc.mu.txn, + ) } if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index 14efdd53ab6b..6f1baf680cb3 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -264,6 +264,9 @@ func (p *outOfOrderRequestsProvider) enqueue(requests []singleRangeBatch) { if len(p.requests) > 0 { panic(errors.AssertionFailedf("outOfOrderRequestsProvider has old requests in enqueue")) } + if len(requests) == 0 { + panic(errors.AssertionFailedf("outOfOrderRequestsProvider enqueuing zero requests")) + } p.requests = requests p.hasWork.Signal() } @@ -388,6 +391,9 @@ func (p *inOrderRequestsProvider) enqueue(requests []singleRangeBatch) { if len(p.requests) > 0 { panic(errors.AssertionFailedf("inOrderRequestsProvider has old requests in enqueue")) } + if len(requests) == 0 { + panic(errors.AssertionFailedf("inOrderRequestsProvider enqueuing zero requests")) + } p.requests = requests p.heapInit() p.hasWork.Signal() diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index b3dfc8a2ef7e..3eb1a7915007 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -224,6 +224,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", + "@com_github_cockroachdb_pebble//objstorage/remote", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 88501088f439..9b36881486af 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) { } else { require.NotNil(t, result.Replicated.AddSSTable) require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) + require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"})) } var expect kvs @@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { require.NoError(t, err) require.NoError(t, fs.WriteFile(engine, "sst", sst)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) + require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"})) statsEvaled := statsBefore statsEvaled.Add(*cArgs.Stats) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 4cf3df327f2e..c4d5a42fb106 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -37,6 +37,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/redact" "github.com/kr/pretty" "golang.org/x/time/rate" @@ -628,19 +630,32 @@ func addSSTablePreApply( sst.Span, sst.RemoteFileLoc, ) - // TODO(bilal): replace this with the real ingest. - /* - start := storage.EngineKey{Key: sst.Span.Key} - end := storage.EngineKey{Key: sst.Span.EndKey} - - externalFile := pebble.ExternalFile{ - Locator: shared.Locator(sst.RemoteFileLoc), - ObjName: sst.RemoteFilePath, - Size: sst.BackingFileSize, - SmallestUserKey: start.Encode(), - LargestUserKey: end.Encode(), - }*/ - log.Fatalf(ctx, "Unsupported IngestRemoteFile") + start := storage.EngineKey{Key: sst.Span.Key} + end := storage.EngineKey{Key: sst.Span.EndKey} + externalFile := pebble.ExternalFile{ + Locator: remote.Locator(sst.RemoteFileLoc), + ObjName: sst.RemoteFilePath, + Size: sst.BackingFileSize, + SmallestUserKey: start.Encode(), + LargestUserKey: end.Encode(), + } + tBegin := timeutil.Now() + defer func() { + if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() { + log.Infof(ctx, + "ingesting SST of size %s at index %d took %.2fs", + humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(), + ) + } + }() + + _, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile}) + if ingestErr != nil { + log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr) + } + // Adding without modification succeeded, no copy necessary. + log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath) + return false } checksum := util.CRC32(sst.Data) @@ -685,7 +700,7 @@ func addSSTablePreApply( } // Regular path - we made a hard link, so we can ingest the hard link now. - ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath}) + ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath}) if ingestErr != nil { log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr) } @@ -726,7 +741,7 @@ func ingestViaCopy( if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil { return errors.Wrapf(err, "while ingesting %s", ingestPath) } - if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil { + if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil { return errors.Wrapf(err, "while ingesting %s", ingestPath) } log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 4865725f64ea..bd269d450ac4 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -578,7 +578,7 @@ func (r *Replica) applySnapshot( // TODO: separate ingestions for log and statemachine engine. See: // // https://github.com/cockroachdb/cockroach/issues/93251 - r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) } if r.store.cfg.KVAdmissionController != nil { diff --git a/pkg/kv/kvserver/reports/reporter_test.go b/pkg/kv/kvserver/reports/reporter_test.go index e9e128ccba0f..fac03794a478 100644 --- a/pkg/kv/kvserver/reports/reporter_test.go +++ b/pkg/kv/kvserver/reports/reporter_test.go @@ -48,6 +48,8 @@ func TestConstraintConformanceReportIntegration(t *testing.T) { // don't make progress. skip.UnderStressRace(t) skip.UnderRace(t, "takes >1min under race") + // Similarly, skip the test under deadlock builds. + skip.UnderDeadlock(t, "takes >1min under deadlock") ctx := context.Background() tc := serverutils.StartNewTestCluster(t, 5, base.TestClusterArgs{ diff --git a/pkg/sql/create_as_test.go b/pkg/sql/create_as_test.go index d1c277084edc..790a1ab1bf29 100644 --- a/pkg/sql/create_as_test.go +++ b/pkg/sql/create_as_test.go @@ -325,6 +325,7 @@ AND status != 'succeeded'` func TestFormat(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) testCases := []struct { sql string diff --git a/pkg/sql/create_function_test.go b/pkg/sql/create_function_test.go index 315fe9668ffa..cfc1022f0224 100644 --- a/pkg/sql/create_function_test.go +++ b/pkg/sql/create_function_test.go @@ -151,6 +151,7 @@ SELECT nextval(105:::REGCLASS);`, func TestVersionGatingUDFInCheckConstraints(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) t.Run("new_schema_changer_version_enabled", func(t *testing.T) { params, _ := createTestServerParams() diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index d7e0e76a9ec7..e4db4cf2045f 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1494,6 +1494,7 @@ func (t *logicTest) newCluster( stats.DefaultRefreshInterval = time.Millisecond t.cluster = serverutils.StartNewTestCluster(t.rootT, cfg.NumNodes, params) + t.purgeZoneConfig() if cfg.UseFakeSpanResolver { // We need to update the DistSQL span resolver with the fake resolver. // Note that DistSQL was disabled in makeClusterSetting above, so we diff --git a/pkg/sql/multitenant_admin_function_test.go b/pkg/sql/multitenant_admin_function_test.go index 8c6a0389776d..143f6144e3e5 100644 --- a/pkg/sql/multitenant_admin_function_test.go +++ b/pkg/sql/multitenant_admin_function_test.go @@ -246,7 +246,7 @@ func (tc testCase) runTest( if numNodes == 0 { numNodes = 1 } - cfg.ServerArgs.DefaultTestTenant = base.TestTenantProbabilistic + cfg.ServerArgs.DefaultTestTenant = base.TestControlsTenantsExplicitly testCluster := serverutils.StartNewTestCluster(t, numNodes, cfg.TestClusterArgs) defer testCluster.Stopper().Stop(ctx) diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index f445ee3990c8..bc03f09f9685 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -7586,6 +7586,7 @@ CREATE TABLE t.test (pk INT PRIMARY KEY, v INT); // to conclude. If the locks were not dropped, a deadlock could occur. func TestConcurrentSchemaChangesDoNotDeadlock(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/sql/schemachanger/BUILD.bazel b/pkg/sql/schemachanger/BUILD.bazel index 8f8658a425a2..51fee6bc6acb 100644 --- a/pkg/sql/schemachanger/BUILD.bazel +++ b/pkg/sql/schemachanger/BUILD.bazel @@ -39,13 +39,14 @@ go_test( "//pkg/ccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/kv", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/desctestutils", "//pkg/sql/execinfra", "//pkg/sql/pgwire/pgcode", @@ -54,24 +55,21 @@ go_test( "//pkg/sql/schemachanger/scop", "//pkg/sql/schemachanger/scplan", "//pkg/sql/schemachanger/sctest", # keep - "//pkg/sql/sqltestutils", + "//pkg/sql/sessiondatapb", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", - "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", - "//pkg/util/syncutil", - "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//errorspb", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index be5838e1acf9..1076357befc5 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -14,20 +14,22 @@ import ( "context" "encoding/hex" "fmt" + "math/rand" "regexp" "strings" "sync" "sync/atomic" "testing" + "time" - "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -35,783 +37,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" - "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/errorspb" "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) -// TestConcurrentDeclarativeSchemaChanges tests that concurrent declarative -// schema changes operating on the same descriptors are performed serially. -func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx, cancel := context.WithCancel(context.Background()) - - var maxValue = 4000 - if util.RaceEnabled { - // Race builds are a lot slower, so use a smaller number of rows. - maxValue = 200 - } - - // Protects backfillNotification. - var mu syncutil.Mutex - var backfillNotification, continueNotification chan struct{} - // We have to have initBackfillNotification return the new - // channel rather than having later users read the original - // backfillNotification to make the race detector happy. - initBackfillNotification := func() (chan struct{}, chan struct{}) { - mu.Lock() - defer mu.Unlock() - backfillNotification = make(chan struct{}) - continueNotification = make(chan struct{}) - return backfillNotification, continueNotification - } - notifyBackfill := func() { - mu.Lock() - defer mu.Unlock() - if backfillNotification != nil { - close(backfillNotification) - backfillNotification = nil - } - if continueNotification != nil { - <-continueNotification - close(continueNotification) - continueNotification = nil - } - } - var alterPrimaryKeyBlockedCounter atomic.Uint32 - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { - for _, stmt := range stmts { - if strings.Contains(stmt, "ALTER PRIMARY KEY") { - alterPrimaryKeyBlockedCounter.Add(1) - return - } - } - }, - }, - DistSQL: &execinfra.TestingKnobs{ - RunBeforeBackfillChunk: func(_ roachpb.Span) error { - notifyBackfill() - return nil - }, - }, - // Decrease the adopt loop interval so that retries happen quickly. - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - // Prevent the GC job from running so we ensure that all the keys - // which were written remain. - GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { - <-ctx.Done() - return ctx.Err() - }}, - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - defer cancel() - codec := s.ApplicationLayer().Codec() - - if _, err := sqlDB.Exec(`CREATE DATABASE t`); err != nil { - t.Fatal(err) - } - - if _, err := sqlDB.Exec(`CREATE TABLE t.test (k INT NOT NULL, v INT)`); err != nil { - t.Fatal(err) - } - if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { - t.Fatal(err) - } - - backfillNotif, continueNotif := initBackfillNotification() - var wg sync.WaitGroup - wg.Add(1) - go func() { - if _, err := sqlDB.Exec(`CREATE INDEX i ON t.test (v)`); err != nil { - t.Error(err) - } - wg.Done() - }() - - // Wait until the create index schema change job has started - // before kicking off the alter primary key. - <-backfillNotif - require.Zero(t, alterPrimaryKeyBlockedCounter.Load()) - wg.Add(1) - go func() { - if _, err := sqlDB.Exec(`ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (k)`); err != nil { - t.Error(err) - } - wg.Done() - }() - - // The ALTER PRIMARY KEY schema change must block. - testutils.SucceedsSoon(t, func() error { - if alterPrimaryKeyBlockedCounter.Load() == 0 { - return errors.New("waiting for concurrent schema change to block") - } - return nil - }) - - // Unblock the create index job. - continueNotif <- struct{}{} - wg.Wait() - - // The ALTER PRIMARY KEY schema change must have been blocked. - require.NotZero(t, alterPrimaryKeyBlockedCounter.Load()) - - // There should be 5 k/v pairs per row: - // * the original primary index keyed on rowid, - // * the first version of the secondary index on v with rowid as key suffix, - // * the intermediate version of the new primary index keyed on k but - // including rowid as a stored column, - // * the final version of the new primary index keyed on k and not including rowid, - // * the final version of the secondary index for v with k as the key suffix. - testutils.SucceedsSoon(t, func() error { - return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 5, maxValue) - }) -} - -func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - t.Run("wait for legacy schema changes", func(t *testing.T) { - // This test starts an legacy schema change job (job 1), and then starts - // another legacy schema change job (job 2) and a declarative schema change - // job (job 3) while job 1 is backfilling. Job 1 is resumed after job 2 - // has started running. - ctx := context.Background() - - var job1Backfill sync.Once - var job2Resume sync.Once - var job3Wait sync.Once - // Closed when we enter the RunBeforeBackfill knob of job 1. - job1BackfillNotification := make(chan struct{}) - // Closed when we're ready to continue with job 1. - job1ContinueNotification := make(chan struct{}) - // Closed when job 2 starts. - job2ResumeNotification := make(chan struct{}) - // Closed when job 3 starts waiting for concurrent schema changes to finish. - job3WaitNotification := make(chan struct{}) - var job1ID jobspb.JobID - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - RunBeforeResume: func(jobID jobspb.JobID) error { - // Only block in job 2. - if job1ID == 0 || jobID == job1ID { - job1ID = jobID - return nil - } - job2Resume.Do(func() { - close(job2ResumeNotification) - }) - return nil - }, - RunBeforeBackfill: func() error { - job1Backfill.Do(func() { - close(job1BackfillNotification) - <-job1ContinueNotification - }) - return nil - }, - }, - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, idx int) error { - // Assert that when job 3 is running, there are no mutations other - // than the ones associated with this schema change. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - // There are 2 schema changes that should precede job 3. - // The declarative schema changer uses the same mutation ID for all - // its mutations. - for _, m := range table.AllMutations() { - assert.Equal(t, int(m.MutationID()), 3) - } - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(_ []string) { - job3Wait.Do(func() { - close(job3WaitNotification) - }) - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - - g := ctxgroup.WithContext(ctx) - - // Start job 1: An index schema change, which does not use the new schema - // changer. - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.ExecContext(ctx, `SET use_declarative_schema_changer='off'`) - assert.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CREATE INDEX idx ON db.t(a)`) - assert.NoError(t, err) - return nil - }) - - <-job1BackfillNotification - - // Start job 3: A column schema change which uses the new schema changer. - // The transaction will not actually commit until job 1 has finished. - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`) - assert.NoError(t, err) - return nil - }) - - <-job3WaitNotification - - // Start job 2: Another index schema change which does not use the new - // schema changer. - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.ExecContext(ctx, `SET use_declarative_schema_changer='off'`) - assert.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CREATE INDEX idx2 ON db.t(a)`) - assert.NoError(t, err) - return nil - }) - - // Wait for job 2 to start. - <-job2ResumeNotification - - // Finally, let job 1 finish, which will unblock the - // others. - close(job1ContinueNotification) - require.NoError(t, g.Wait()) - - // Check that job 3 was created last. - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status, description FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeSchemaChange.String(), string(jobs.StatusSucceeded), `CREATE INDEX idx ON db.public.t (a)`}, - {jobspb.TypeSchemaChange.String(), string(jobs.StatusSucceeded), `CREATE INDEX idx2 ON db.public.t (a)`}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded), `ALTER TABLE db.public.t ADD COLUMN b INT8 DEFAULT 1`}, - }, - ) - }) - - t.Run("wait for declarative schema changes for tables", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) while job 1 is backfilling. - ctx := context.Background() - - var job1Backfill sync.Once - var job2Wait sync.Once - // Closed when we enter the RunBeforeBackfill knob of job 1. - job1BackfillNotification := make(chan struct{}) - // Closed when we're ready to continue with job 1. - job1ContinueNotification := make(chan struct{}) - // Closed when job 2 starts waiting for concurrent schema changes to finish. - job2WaitNotification := make(chan struct{}) - - stmt1 := `ALTER TABLE db.t ADD COLUMN b INT8 DEFAULT 1` - stmt2 := `ALTER TABLE db.t ADD COLUMN c INT8 DEFAULT 2` - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, idx int) error { - // Verify that we never queue mutations for job 2 before finishing job - // 1. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - mutations := table.AllMutations() - if len(mutations) == 0 { - t.Errorf("unexpected empty mutations") - return errors.Errorf("test failure") - } - var idsSeen []descpb.MutationID - for _, m := range mutations { - if len(idsSeen) == 0 || m.MutationID() > idsSeen[len(idsSeen)-1] { - idsSeen = append(idsSeen, m.MutationID()) - } - } - highestID := idsSeen[len(idsSeen)-1] - assert.Truef(t, highestID <= 1, "unexpected mutation IDs %v", idsSeen) - // Block job 1 during the backfill. - s := p.Stages[idx] - stmt := p.TargetState.Statements[0].Statement - if stmt != stmt1 || s.Type() != scop.BackfillType { - return nil - } - for _, op := range s.EdgeOps { - if backfillOp, ok := op.(*scop.BackfillIndex); ok && backfillOp.IndexID == descpb.IndexID(2) { - job1Backfill.Do(func() { - close(job1BackfillNotification) - <-job1ContinueNotification - }) - } - } - - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] != stmt2 { - return - } - job2Wait.Do(func() { - close(job2WaitNotification) - }) - }, - }, - } - - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job1BackfillNotification - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - <-job2WaitNotification - close(job1ContinueNotification) - require.NoError(t, g.Wait()) - - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - }) - - t.Run("wait for declarative schema changes for schema", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) involving dropping schemas. - // Both of these jobs will need to concurrently touch the database descriptor. - ctx := context.Background() - - var jobWaitForPostCommit sync.Once - var jobWaitBeforeWait sync.Once - // Closed when we're ready to continue with job 1. - job2StartExecution := make(chan struct{}) - job2ContinueNotification := make(chan struct{}) - completionCount := int32(0) - - stmt1 := `DROP SCHEMA db.s1` - stmt2 := `DROP SCHEMA db.s2` - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, stageIdx int) error { - if p.Params.ExecutionPhase != scop.PostCommitPhase { - return nil - } - jobWaitForPostCommit.Do(func() { - job2StartExecution <- struct{}{} - job2ContinueNotification <- struct{}{} - }) - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] == stmt2 { - atomic.AddInt32(&completionCount, 1) - jobWaitBeforeWait.Do(func() { - <-job2ContinueNotification - }) - } - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, "SET CLUSTER SETTING sql.schema.force_declarative_statements='+CREATE SCHEMA'") - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE SCHEMA db.s1`) - tdb.Exec(t, `CREATE SCHEMA db.s2`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job2StartExecution - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - require.NoError(t, g.Wait()) - - // Expect two NEW SCHEMA CHANGE jobs for stmt1 and stmt2. - // The CREATE DATABASE and two CREATE SCHEMAs don't create jobs. - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - // We should observe the schema change was tried at least twice. - require.GreaterOrEqual(t, atomic.LoadInt32(&completionCount), int32(1)) - }) - - t.Run("wait for declarative schema changes for type", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) involving dropping tables. - // Both of these jobs will need to concurrently touch type descriptors. - ctx := context.Background() - - var jobWaitForPostCommit sync.Once - var jobWaitBeforeWait sync.Once - // Closed when we're ready to continue with job 1. - job2StartExecution := make(chan struct{}) - job2ContinueNotification := make(chan struct{}) - completionCount := int32(0) - - stmt1 := `DROP TABLE db.t1` - stmt2 := `DROP TABLE db.t2` - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, stageIdx int) error { - if p.Params.ExecutionPhase != scop.PostCommitPhase { - return nil - } - jobWaitForPostCommit.Do(func() { - job2StartExecution <- struct{}{} - job2ContinueNotification <- struct{}{} - }) - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] == stmt2 { - atomic.AddInt32(&completionCount, 1) - jobWaitBeforeWait.Do(func() { - <-job2ContinueNotification - }) - } - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TYPE db.status AS ENUM ('open', 'closed', 'inactive');`) - tdb.Exec(t, `CREATE TABLE db.t1(t db.status)`) - tdb.Exec(t, `CREATE TABLE db.t2(t db.status)`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job2StartExecution - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - require.NoError(t, g.Wait()) - - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - // We should observe the schema change was tried at least twice. - require.GreaterOrEqual(t, atomic.LoadInt32(&completionCount), int32(1)) - }) -} - -// TestConcurrentSchemaChangesWait ensures that when a schema change -// is run concurrently with a declarative schema change, that it waits for -// the declarative schema change to complete before proceeding. -// -// Each concurrent schema change is run both as a single statement, where the -// test expects an automatic retry, and as part of an explicit transaction -// which has returned rows, in order to ensure that an error with the proper -// error code is returned. -func TestConcurrentSchemaChangesWait(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const defaultInitialStmt = `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1` - type concurrentWaitTest struct { - // initial statement run under the declarative schema changer, paused on - // the first post commit phase. - initial string - // concurrent statement run under the legacy schema changer - concurrent string - } - ctx := context.Background() - runConcurrentSchemaChangeCase := func(t *testing.T, stmts concurrentWaitTest, implicit bool) { - defer log.Scope(t).Close(t) - var doOnce sync.Once - // Closed when we enter the BeforeStage knob with a post commit or later - // phase. - beforePostCommitNotification := make(chan struct{}) - // Closed when we're ready to continue with the schema change. - continueNotification := make(chan struct{}) - // Sent on when we're waiting for the initial schema change. - waitingForConcurrent := make(chan struct{}) - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeWaitingForConcurrentSchemaChanges: func(_ []string) { - waitingForConcurrent <- struct{}{} - }, - BeforeStage: func(p scplan.Plan, idx int) error { - // Verify that we never get a mutation ID not associated with the schema - // change that is running. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - for _, m := range table.AllMutations() { - assert.LessOrEqual(t, int(m.MutationID()), 2) - } - s := p.Stages[idx] - if s.Phase < scop.PostCommitPhase { - return nil - } - doOnce.Do(func() { - close(beforePostCommitNotification) - <-continueNotification - }) - return nil - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - initialSchemaChange := func() error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - for _, s := range strings.Split(stmts.initial, ";") { - _, err = conn.ExecContext(ctx, s) - assert.NoError(t, err) - } - return nil - } - concurrentSchemaChangeImplicit := func() error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - for _, s := range append([]string{ - `SET use_declarative_schema_changer = 'off'`, - }, strings.Split(stmts.concurrent, ";")...) { - if _, err = conn.ExecContext(ctx, s); err != nil { - return err - } - } - return nil - } - concurrentSchemaChangeExplicit := func() error { - var sawRestart bool - defer func() { assert.True(t, sawRestart) }() - return crdb.Execute(func() (err error) { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer func() { - if err != nil { - var pqErr *pq.Error - sawRestart = sawRestart || - errors.As(err, &pqErr) && - string(pqErr.Code) == pgcode.SerializationFailure.String() - _ = tx.Rollback() - } - }() - // Execute something first to ensure that a restart is sent. - if _, err := tx.Exec("SELECT * FROM db.other_t"); err != nil { - return err - } - for _, s := range strings.Split(stmts.concurrent, ";") { - if _, err := tx.ExecContext(ctx, s); err != nil { - return err - } - } - return tx.Commit() - }) - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.other_t (a INT PRIMARY KEY)`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - tdb.Exec(t, `CREATE USER testuser`) - tdb.Exec(t, `CREATE SCHEMA db.sc`) - tdb.Exec(t, `ALTER SCHEMA db.sc OWNER to testuser`) - tdb.Exec(t, `CREATE TABLE db.sc.t (a INT PRIMARY KEY)`) - tdb.Exec(t, `ALTER TABLE db.sc.t OWNER to testuser`) - var initialSchemaChangeGroup errgroup.Group - var concurrentSchemaChangeGroup errgroup.Group - initialSchemaChangeGroup.Go(initialSchemaChange) - <-beforePostCommitNotification - if implicit { - concurrentSchemaChangeGroup.Go(concurrentSchemaChangeImplicit) - } else { - concurrentSchemaChangeGroup.Go(concurrentSchemaChangeExplicit) - } - <-waitingForConcurrent - close(continueNotification) - require.NoError(t, initialSchemaChangeGroup.Wait()) - require.NoError(t, concurrentSchemaChangeGroup.Wait()) - } - - stmts := []concurrentWaitTest{ - {defaultInitialStmt, `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`}, - {defaultInitialStmt, `CREATE INDEX ON db.t(a)`}, - {defaultInitialStmt, `ALTER TABLE db.t RENAME COLUMN a TO c`}, - {defaultInitialStmt, `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`}, - {defaultInitialStmt, `CREATE VIEW db.v AS SELECT a FROM db.t`}, - {defaultInitialStmt, `ALTER TABLE db.t RENAME TO db.new`}, - {defaultInitialStmt, `TRUNCATE TABLE db.t`}, - {defaultInitialStmt, `DROP TABLE db.t`}, - {"USE db; DROP OWNED BY testuser;", `DROP DATABASE db`}, - } - for i := range stmts { - stmt := stmts[i] // copy for closure - t.Run(stmt.concurrent, func(t *testing.T) { - testutils.RunTrueAndFalse(t, "implicit", func(t *testing.T, implicit bool) { - runConcurrentSchemaChangeCase(t, stmt, implicit) - }) - }) - } -} - func TestSchemaChangerJobRunningStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1166,3 +407,384 @@ WHERE }) } } + +// TestSchemaChangeWaitsForConcurrentSchemaChanges tests that if a schema +// change on a table is issued when there is already an ongoing schema change +// on that table, it will wait until that ongoing schema change finishes before +// proceeding. +func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tf := func(t *testing.T, modeFor1stStmt, modeFor2ndStmt sessiondatapb.NewSchemaChangerMode) { + ctx, cancel := context.WithCancel(context.Background()) + createIndexChan := make(chan struct{}) + addColChan := make(chan struct{}) + var closeMainChanOnce, closeAlterPKChanOnce sync.Once + + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ + // If the blocked schema changer is from legacy schema changer, we let + // it hijack this knob (which is originally design for declarative + // schema changer) if `stmt` is nil. + WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { + if (len(stmts) == 1 && strings.Contains(stmts[0], "ADD COLUMN")) || + stmts == nil { + closeAlterPKChanOnce.Do(func() { + close(addColChan) + }) + } + }, + }, + DistSQL: &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: func(_ roachpb.Span) error { + closeMainChanOnce.Do(func() { + close(createIndexChan) + }) + <-addColChan // wait for AddCol to unblock me + return nil + }, + }, + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + // Prevent the GC job from running so we ensure that all the keys which + // were written remain. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { + <-ctx.Done() + return ctx.Err() + }}, + } + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + defer cancel() + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL);") + tdb.Exec(t, "INSERT INTO t SELECT k, k+1 FROM generate_series(1,1000) AS tmp(k);") + + // Execute 1st DDL asynchronously and block until it's executing. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor1stStmt.String()) + go func() { + tdb.Exec(t, `CREATE INDEX idx ON t (j);`) + }() + <-createIndexChan + + // Execute 2st DDL synchronously. During waiting, it will unblock 1st DDL so + // it will eventually be able to proceed after waiting for a while. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor2ndStmt.String()) + tdb.Exec(t, `ALTER TABLE t ADD COLUMN k INT DEFAULT 30;`) + + // There should be 2 k/v pairs per row: + // 1. the old primary index (i : j) + // 2. the new secondary index keyed on j with key suffix on i (j; i : ), from CREATE INDEX + // Additionally, if ADD COLUMN uses declarative schema changer, there will + // one 1 more k/v pair for each row: + // 3. the new primary index (i : j, k), from ADD COLUMN + expectedKeyCount := 2000 + if modeFor2ndStmt == sessiondatapb.UseNewSchemaChangerUnsafeAlways { + expectedKeyCount = 3000 + } + requireTableKeyCount(ctx, t, s.ApplicationLayer().Codec(), kvDB, + "defaultdb", "t", expectedKeyCount) + } + + t.Run("declarative-then-declarative", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + + t.Run("declarative-then-legacy", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerOff) + }) + + t.Run("legacy-then-declarative", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + + // legacy + legacy case is tested in TestLegacySchemaChangerWaitsForOtherSchemaChanges + // because the waiting occurred under a different code path. +} + +// requireTableKeyCount ensures that `db`.`tbl` has `keyCount` kv-pairs in it. +func requireTableKeyCount( + ctx context.Context, + t *testing.T, + codec keys.SQLCodec, + kvDB *kv.DB, + db string, + tbl string, + keyCount int, +) { + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, db, tbl) + tablePrefix := codec.TablePrefix(uint32(tableDesc.GetID())) + tableEnd := tablePrefix.PrefixEnd() + kvs, err := kvDB.Scan(ctx, tablePrefix, tableEnd, 0) + require.NoError(t, err) + require.Equal(t, keyCount, len(kvs)) +} + +// TestConcurrentSchemaChanges is an integration style tests where we issue many +// schema changes concurrently (renames, add/drop columns, and create/drop +// indexes) for a period of time and assert that they all finish eventually and +// we end up with expected names, columns, and indexes. +func TestConcurrentSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderShort(t, "this test is long running (>3 mins).") + skip.UnderStress(t, "test is already integration style and long running") + skip.UnderStressRace(t, "test is already integration style and long running") + skip.UnderRace(t, "the test knowingly has data race and has logic to account for that") + + const testDuration = 3 * time.Minute + const renameDBInterval = 5 * time.Second + const renameSCInterval = 4 * time.Second + const renameTblInterval = 3 * time.Second + const addColInterval = 1 * time.Second + const dropColInterval = 1 * time.Second + const createIdxInterval = 1 * time.Second + const dropIdxInterval = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + dbName, scName, tblName := "testdb", "testsc", "t" + allColToIndexes := make(map[string]map[string]struct{}) // colName -> indexes that uses that column + allColToIndexes["col"] = map[string]struct{}{"t_pkey": {}} + allNonPublicIdxToKeyCols := make(map[string]map[string]struct{}) // indexName -> its key column(s) + tdb.Exec(t, fmt.Sprintf("CREATE DATABASE %v;", dbName)) + tdb.Exec(t, fmt.Sprintf("CREATE SCHEMA %v.%v;", dbName, scName)) + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %v.%v.%v (col INT PRIMARY KEY);", dbName, scName, tblName)) + tdb.Exec(t, fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + + // repeatFnWithInterval repeats `fn` indefinitely every `interval` until + // `ctx` is cancelled. + workerErrChan := make(chan error) + var wg sync.WaitGroup + repeatWorkWithInterval := func(workerName string, workInterval time.Duration, work func() error) { + wg.Add(1) + defer wg.Done() + for { + jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32()) + select { + case <-ctx.Done(): + t.Logf("%v is signaled to finish work", workerName) + return + case <-time.After(jitteredInterval): + if err := work(); err != nil { + t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error()) + workerErrChan <- err + return + } + } + } + } + + // validate performs a few quick validations after all schema changes are finished: + // 1. Database, schema, and table indeed end up with the tracked name. + // 2. Table indeed has the tracked columns. + // 3. Table indeed has the tracked indexes. + codec := s.ApplicationLayer().Codec() + validate := func() { + dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, codec, dbName) + desctestutils.TestingGetSchemaDescriptor(kvDB, codec, dbDesc.GetID(), scName) + tblDesc := desctestutils.TestingGetTableDescriptor(kvDB, codec, dbName, scName, tblName) + require.Equal(t, len(allColToIndexes), len(tblDesc.PublicColumns())) // allColToIndexes does not include `col` + for _, col := range tblDesc.PublicColumns() { + _, ok := allColToIndexes[col.GetName()] + require.True(t, ok, "column %v does not exist in allColToIndexes=%v", col.GetName(), allColToIndexes) + } + require.Equal(t, len(allNonPublicIdxToKeyCols), len(tblDesc.PublicNonPrimaryIndexes())) + for _, idx := range tblDesc.PublicNonPrimaryIndexes() { + _, ok := allNonPublicIdxToKeyCols[idx.GetName()] + require.True(t, ok, "index %v does not exist in allNonPublicIdxToKeyCols=%v", idx.GetName(), allNonPublicIdxToKeyCols) + } + } + + // A goroutine that repeatedly renames database `testdb` randomly. + go repeatWorkWithInterval("rename-db-worker", renameDBInterval, func() error { + newDBName := fmt.Sprintf("testdb_%v", rand.Intn(1000)) + if newDBName == dbName { + return nil + } + if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { + return err + } + dbName = newDBName + t.Logf("RENAME DATABASE TO %v", newDBName) + return nil + }) + + // A goroutine that renames schema `testdb.testsc` randomly. + go repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func() error { + newSCName := fmt.Sprintf("testsc_%v", rand.Intn(1000)) + if scName == newSCName { + return nil + } + _, err := sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + if err == nil { + scName = newSCName + t.Logf("RENAME SCHEMA TO %v", newSCName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase) { + err = nil // mute those errors as they're expected + t.Logf("Parent database is renamed; skipping this schema renaming.") + } + return err + }) + + // A goroutine that renames table `testdb.testsc.t` randomly. + go repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func() error { + newTblName := fmt.Sprintf("t_%v", rand.Intn(1000)) + _, err := sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + if err == nil { + tblName = newTblName + t.Logf("RENAME TABLE TO %v", newTblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName) { + err = nil + t.Logf("Parent database or schema is renamed; skipping this table renaming.") + } + return err + }) + + // A goroutine that adds columns to `testdb.testsc.t` randomly. + go repeatWorkWithInterval("add-column-worker", addColInterval, func() error { + newColName := fmt.Sprintf("col_%v", rand.Intn(1000)) + if _, ok := allColToIndexes[newColName]; ok { + return nil + } + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", dbName, scName, tblName, newColName, rand.Intn(100))) + if err == nil { + allColToIndexes[newColName] = make(map[string]struct{}) + t.Logf("ADD COLUMN %v TO TABLE %v", newColName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this column addition.") + } + return err + }) + + // A goroutine that drops columns from `testdb.testsc.t` randomly. + go repeatWorkWithInterval("drop-column-worker", dropColInterval, func() error { + // Randomly pick a non-PK column to drop. + if len(allColToIndexes) == 1 { + return nil + } + var colName string + for col := range allColToIndexes { + if col != "col" { + colName = col + break + } + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", dbName, scName, tblName, colName)) + if err == nil { + for indexName := range allColToIndexes[colName] { + delete(allNonPublicIdxToKeyCols, indexName) + } + delete(allColToIndexes, colName) + t.Logf("DROP COLUMN %v FROM TABLE %v", colName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this column removal.") + } + return err + }) + + // A goroutine that creates secondary index on a randomly selected column. + go repeatWorkWithInterval("create-index-worker", createIdxInterval, func() error { + newIndexName := fmt.Sprintf("idx_%v", rand.Intn(1000)) + if _, ok := allNonPublicIdxToKeyCols[newIndexName]; ok { + return nil + } + + // Randomly pick a non-PK column to create an index on. + if len(allColToIndexes) == 1 { + return nil + } + var colName string + for col := range allColToIndexes { + if col != "col" { + colName = col + break + } + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", newIndexName, dbName, scName, tblName, colName)) + if err == nil { + allNonPublicIdxToKeyCols[newIndexName] = map[string]struct{}{colName: {}} + allColToIndexes[colName][newIndexName] = struct{}{} + t.Logf("CREATE INDEX %v ON TABLE %v(%v)", newIndexName, tblName, colName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedColumn) { + err = nil + t.Logf("Parent database or schema or table is renamed or column is dropped; skipping this index creation.") + } + return err + }) + + // A goroutine that drops a secondary index randomly. + go repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func() error { + // Randomly pick a non-public index to drop. + if len(allNonPublicIdxToKeyCols) == 0 { + return nil + } + var indexName string + var indexKeyCols map[string]struct{} + for idx, idxCols := range allNonPublicIdxToKeyCols { + indexName = idx + indexKeyCols = idxCols + break + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) + if err == nil { + for indexKeyCol := range indexKeyCols { + delete(allColToIndexes[indexKeyCol], indexName) + } + delete(allNonPublicIdxToKeyCols, indexName) + t.Logf("DROP INDEX %v FROM TABLE %v", indexName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedObject) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this index removal.") + } + return err + }) + + select { + case workerErr := <-workerErrChan: + t.Logf("main: a worker error %q is signaled; Inform all workers to stop.", workerErr.Error()) + cancel() + wg.Wait() + t.Logf("main: all workers have stopped their work; Test Failure!") + t.Fatalf(workerErr.Error()) + case <-time.After(testDuration): + t.Logf("main: time's up! Inform all workers to stop.") + cancel() + wg.Wait() + t.Logf("main: all workers have stopped. Validating descriptors states...") + validate() + t.Logf("main: validation succeeded! Test success!") + } +} + +func isPQErrWithCode(err error, codes ...pgcode.Code) bool { + if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { + for _, code := range codes { + if pgcode.MakeCode(string(pqErr.Code)) == code { + return true + } + } + } + return false +} diff --git a/pkg/sql/sequence_test.go b/pkg/sql/sequence_test.go index 3e4ebf62b5a6..53c791adf00c 100644 --- a/pkg/sql/sequence_test.go +++ b/pkg/sql/sequence_test.go @@ -115,6 +115,7 @@ func BenchmarkUniqueRowID(b *testing.B) { // - ownership removal func TestSequenceOwnershipDependencies(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() params := base.TestServerArgs{} diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 345fec419e00..3fdcd1dcf39b 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -1944,7 +1944,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) { } require.NoError(b, writer.Close()) batch.Close() - require.NoError(b, eng.IngestExternalFiles(ctx, []string{sstFileName})) + require.NoError(b, eng.IngestLocalFiles(ctx, []string{sstFileName})) } for i := 0; i < b.N; i++ { rw := eng.NewReadOnly(StandardDurability) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 97056747c635..8c6b59cdb19c 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1024,18 +1024,22 @@ type Engine interface { NewSnapshot() Reader // Type returns engine type. Type() enginepb.EngineType - // IngestExternalFiles atomically links a slice of files into the RocksDB + // IngestLocalFiles atomically links a slice of files into the RocksDB // log-structured merge-tree. - IngestExternalFiles(ctx context.Context, paths []string) error - // IngestExternalFilesWithStats is a variant of IngestExternalFiles that + IngestLocalFiles(ctx context.Context, paths []string) error + // IngestLocalFilesWithStats is a variant of IngestLocalFiles that // additionally returns ingestion stats. - IngestExternalFilesWithStats( + IngestLocalFilesWithStats( ctx context.Context, paths []string) (pebble.IngestOperationStats, error) - // IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats + // IngestAndExciseFiles is a variant of IngestLocalFilesWithStats // that excises an ExciseSpan, and ingests either local or shared sstables or // both. - IngestAndExciseExternalFiles( + IngestAndExciseFiles( ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error) + // IngestExternalFiles is a variant of IngestLocalFiles that takes external + // files. These files can be referred to by multiple stores, but are not + // modified or deleted by the Engine doing the ingestion. + IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error) // PreIngestDelay offers an engine the chance to backpressure ingestions. // When called, it may choose to block if the engine determines that it is in // or approaching a state where further ingestions may risk its health. diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 9b6e83f1291b..9e10b24d514a 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -779,7 +779,7 @@ func (i ingestOp) run(ctx context.Context) string { } sstWriter.Close() - if err := i.m.engine.IngestExternalFiles(ctx, []string{sstPath}); err != nil { + if err := i.m.engine.IngestLocalFiles(ctx, []string{sstPath}); err != nil { return fmt.Sprintf("error = %s", err.Error()) } diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 7e272f838518..64381b0685d3 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -1371,7 +1371,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { if err := fs.WriteFile(db2, `ingest`, memFile.Data()); err != nil { t.Fatal(err) } - if err := db2.IngestExternalFiles(ctx, []string{`ingest`}); err != nil { + if err := db2.IngestLocalFiles(ctx, []string{`ingest`}); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 2ab380055ba3..3ff79aaf7c55 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -582,8 +582,7 @@ func DefaultPebbleOptions() *pebble.Options { MemTableStopWritesThreshold: 4, Merger: MVCCMerger, BlockPropertyCollectors: PebbleBlockPropertyCollectors, - // Minimum supported format. - FormatMajorVersion: MinimumSupportedFormatVersion, + FormatMajorVersion: MinimumSupportedFormatVersion, } // Automatically flush 10s after the first range tombstone is added to a // memtable. This ensures that we can reclaim space even when there's no @@ -2041,20 +2040,20 @@ func (p *Pebble) Type() enginepb.EngineType { return enginepb.EngineTypePebble } -// IngestExternalFiles implements the Engine interface. -func (p *Pebble) IngestExternalFiles(ctx context.Context, paths []string) error { +// IngestLocalFiles implements the Engine interface. +func (p *Pebble) IngestLocalFiles(ctx context.Context, paths []string) error { return p.db.Ingest(paths) } -// IngestExternalFilesWithStats implements the Engine interface. -func (p *Pebble) IngestExternalFilesWithStats( +// IngestLocalFilesWithStats implements the Engine interface. +func (p *Pebble) IngestLocalFilesWithStats( ctx context.Context, paths []string, ) (pebble.IngestOperationStats, error) { return p.db.IngestWithStats(paths) } -// IngestAndExciseExternalFiles implements the Engine interface. -func (p *Pebble) IngestAndExciseExternalFiles( +// IngestAndExciseFiles implements the Engine interface. +func (p *Pebble) IngestAndExciseFiles( ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span, ) (pebble.IngestOperationStats, error) { rawSpan := pebble.KeyRange{ @@ -2064,6 +2063,13 @@ func (p *Pebble) IngestAndExciseExternalFiles( return p.db.IngestAndExcise(paths, shared, rawSpan) } +// IngestExternalFiles implements the Engine interface. +func (p *Pebble) IngestExternalFiles( + ctx context.Context, external []pebble.ExternalFile, +) (pebble.IngestOperationStats, error) { + return p.db.IngestExternalFiles(external) +} + // PreIngestDelay implements the Engine interface. func (p *Pebble) PreIngestDelay(ctx context.Context) { preIngestDelay(ctx, p, p.settings) diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 54cc5b4e3722..04b764d1202a 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -284,13 +284,9 @@ func StartNewTestCluster( ) TestClusterInterface { cluster := NewTestCluster(t, numNodes, args) cluster.Start(t) - for i := 0; i < cluster.NumServers(); i++ { - sysconfigProvider := cluster.Server(i).SystemConfigProvider() - sysconfig := sysconfigProvider.GetSystemConfig() - if sysconfig != nil { - sysconfig.PurgeZoneConfigCache() - } - } + // Note: do not add logic here. To customize cluster configuration, + // add testing knobs and check them in testcluster.NewTestCluster. + // Not all tests use StartNewTestCluster. return cluster }