From 5975d2dacc5abf041d8e9f725bf4e823f73f8635 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 9 Aug 2023 10:45:05 -0400 Subject: [PATCH 01/10] kv: skip TestConstraintConformanceReportIntegration under deadlock Fixes #108430. This commit avoids flakiness in `TestConstraintConformanceReportIntegration` by skipping the test under deadlock builds. It has been observed to run slowly and flake under stress, and we see the same kinds of behavior under deadlock builds. Release notes: None --- pkg/kv/kvserver/reports/reporter_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/kv/kvserver/reports/reporter_test.go b/pkg/kv/kvserver/reports/reporter_test.go index e9e128ccba0f..fac03794a478 100644 --- a/pkg/kv/kvserver/reports/reporter_test.go +++ b/pkg/kv/kvserver/reports/reporter_test.go @@ -48,6 +48,8 @@ func TestConstraintConformanceReportIntegration(t *testing.T) { // don't make progress. skip.UnderStressRace(t) skip.UnderRace(t, "takes >1min under race") + // Similarly, skip the test under deadlock builds. + skip.UnderDeadlock(t, "takes >1min under deadlock") ctx := context.Background() tc := serverutils.StartNewTestCluster(t, 5, base.TestClusterArgs{ From 6f3652a1cf7347036029e5ecbe456cb481d9db33 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 9 Aug 2023 02:10:10 +0200 Subject: [PATCH 02/10] serverutils: remove ad-hoc code from StartNewTestCluster This function is a convenience alias for NewTestCluster+Start. This should not contain custom logic specific to certain tests. Any custom logic should be conditional on testing knobs and put inside `(*testcluster.TestCluster).Start()` instead. (The code removed here was mistakenly added in the wrong place in 70f85cd5671382a6c3843427b6deb870b3ee176f). Release note: None --- pkg/sql/logictest/logic.go | 1 + pkg/testutils/serverutils/test_cluster_shim.go | 10 +++------- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index d7e0e76a9ec7..e4db4cf2045f 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1494,6 +1494,7 @@ func (t *logicTest) newCluster( stats.DefaultRefreshInterval = time.Millisecond t.cluster = serverutils.StartNewTestCluster(t.rootT, cfg.NumNodes, params) + t.purgeZoneConfig() if cfg.UseFakeSpanResolver { // We need to update the DistSQL span resolver with the fake resolver. // Note that DistSQL was disabled in makeClusterSetting above, so we diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 54cc5b4e3722..04b764d1202a 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -284,13 +284,9 @@ func StartNewTestCluster( ) TestClusterInterface { cluster := NewTestCluster(t, numNodes, args) cluster.Start(t) - for i := 0; i < cluster.NumServers(); i++ { - sysconfigProvider := cluster.Server(i).SystemConfigProvider() - sysconfig := sysconfigProvider.GetSystemConfig() - if sysconfig != nil { - sysconfig.PurgeZoneConfigCache() - } - } + // Note: do not add logic here. To customize cluster configuration, + // add testing knobs and check them in testcluster.NewTestCluster. + // Not all tests use StartNewTestCluster. return cluster } From 7f6f9c37021ae1e26bbd6eef403cda860ee3040b Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 9 Aug 2023 19:07:28 -0400 Subject: [PATCH 03/10] kv: remove errSavepointInvalidAfterTxnRestart This commit simplifies logic in `checkSavepointLocked`. Epic: None Release note: None --- .../kvcoord/txn_coord_sender_savepoints.go | 37 ++++++------------- 1 file changed, 11 insertions(+), 26 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index bd258d07bc35..948fcb892f1d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) // savepoint captures the state in the TxnCoordSender necessary to restore that @@ -121,15 +122,8 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin } sp := s.(*savepoint) - err := tc.checkSavepointLocked(sp) + err := tc.checkSavepointLocked(sp, "rollback to") if err != nil { - if errors.Is(err, errSavepointInvalidAfterTxnRestart) { - err = kvpb.NewTransactionRetryWithProtoRefreshError( - "cannot rollback to savepoint after a transaction restart", - tc.mu.txn.ID, - tc.mu.txn, - ) - } return err } @@ -165,15 +159,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo } sp := s.(*savepoint) - err := tc.checkSavepointLocked(sp) - if errors.Is(err, errSavepointInvalidAfterTxnRestart) { - err = kvpb.NewTransactionRetryWithProtoRefreshError( - "cannot release savepoint after a transaction restart", - tc.mu.txn.ID, - tc.mu.txn, - ) - } - return err + return tc.checkSavepointLocked(sp, "release") } type errSavepointOperationInErrorTxn struct{} @@ -193,23 +179,22 @@ func (tc *TxnCoordSender) assertNotFinalized() error { return nil } -var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart") - // checkSavepointLocked checks whether the provided savepoint is still valid. -// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an +// Returns a TransactionRetryWithProtoRefreshError if the savepoint is not an // "initial" one and the transaction has restarted since the savepoint was // created. -func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error { +func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint, op redact.SafeString) error { // Only savepoints taken before any activity are allowed to be used after a // transaction restart. if s.Initial() { return nil } - if s.txnID != tc.mu.txn.ID { - return errSavepointInvalidAfterTxnRestart - } - if s.epoch != tc.mu.txn.Epoch { - return errSavepointInvalidAfterTxnRestart + if s.txnID != tc.mu.txn.ID || s.epoch != tc.mu.txn.Epoch { + return kvpb.NewTransactionRetryWithProtoRefreshError( + redact.Sprintf("cannot %s savepoint after a transaction restart", op), + s.txnID, + tc.mu.txn, + ) } if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { From c6310af8100e985cc2626ce3a8f5dc74bf01e2b1 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 9 Aug 2023 19:54:50 -0700 Subject: [PATCH 04/10] sql: add some log scopes in tests Release note: None --- pkg/sql/create_as_test.go | 1 + pkg/sql/create_function_test.go | 1 + pkg/sql/schema_changer_test.go | 1 + pkg/sql/sequence_test.go | 1 + 4 files changed, 4 insertions(+) diff --git a/pkg/sql/create_as_test.go b/pkg/sql/create_as_test.go index d1c277084edc..790a1ab1bf29 100644 --- a/pkg/sql/create_as_test.go +++ b/pkg/sql/create_as_test.go @@ -325,6 +325,7 @@ AND status != 'succeeded'` func TestFormat(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) testCases := []struct { sql string diff --git a/pkg/sql/create_function_test.go b/pkg/sql/create_function_test.go index 315fe9668ffa..cfc1022f0224 100644 --- a/pkg/sql/create_function_test.go +++ b/pkg/sql/create_function_test.go @@ -151,6 +151,7 @@ SELECT nextval(105:::REGCLASS);`, func TestVersionGatingUDFInCheckConstraints(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) t.Run("new_schema_changer_version_enabled", func(t *testing.T) { params, _ := createTestServerParams() diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index f445ee3990c8..bc03f09f9685 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -7586,6 +7586,7 @@ CREATE TABLE t.test (pk INT PRIMARY KEY, v INT); // to conclude. If the locks were not dropped, a deadlock could occur. func TestConcurrentSchemaChangesDoNotDeadlock(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/sql/sequence_test.go b/pkg/sql/sequence_test.go index 3e4ebf62b5a6..53c791adf00c 100644 --- a/pkg/sql/sequence_test.go +++ b/pkg/sql/sequence_test.go @@ -115,6 +115,7 @@ func BenchmarkUniqueRowID(b *testing.B) { // - ownership removal func TestSequenceOwnershipDependencies(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() params := base.TestServerArgs{} From b66139451b75e3930deea5bd7909ceb4881f0428 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 9 Aug 2023 20:47:06 -0700 Subject: [PATCH 05/10] sql: don't start default test tenant in MT admin function tests These tests themselves start multiple tenants, so there is no need to create a default test tenant (doing that also makes it a bit more confusing because the default tenant as well as the first test tenant share the same TenantID effectively making it two SQL pod config, which is confusing). Starting the default test tenant was enabled recently in c8996612ac823b1e65028a8b4eb8e3f8d778138e when we enabled the CCL license, and we have seen at least one confusing failure that is possibly related to this. Starting the default test tenant was originally added in cfa437560c93dd07fae2ac804c757842eca5d369, but I don't see a good reason for it. Release note: None --- pkg/sql/multitenant_admin_function_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/multitenant_admin_function_test.go b/pkg/sql/multitenant_admin_function_test.go index 8c6a0389776d..143f6144e3e5 100644 --- a/pkg/sql/multitenant_admin_function_test.go +++ b/pkg/sql/multitenant_admin_function_test.go @@ -246,7 +246,7 @@ func (tc testCase) runTest( if numNodes == 0 { numNodes = 1 } - cfg.ServerArgs.DefaultTestTenant = base.TestTenantProbabilistic + cfg.ServerArgs.DefaultTestTenant = base.TestControlsTenantsExplicitly testCluster := serverutils.StartNewTestCluster(t, numNodes, cfg.TestClusterArgs) defer testCluster.Stopper().Stop(ctx) From 1e3cefe321237c0af98b6a0b10de33a2106367ab Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Wed, 9 Aug 2023 23:30:32 -0700 Subject: [PATCH 06/10] kvstreamer: add more assertions to RequestsProvider.enqueue If we ever enqueue zero-length requests, it could cause a deadlock where the `workerCoordinator` is waiting for more requests and the enqueuer is waiting for results. Add assertions that we never do this. Release note: None --- pkg/kv/kvclient/kvstreamer/requests_provider.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index 14efdd53ab6b..6f1baf680cb3 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -264,6 +264,9 @@ func (p *outOfOrderRequestsProvider) enqueue(requests []singleRangeBatch) { if len(p.requests) > 0 { panic(errors.AssertionFailedf("outOfOrderRequestsProvider has old requests in enqueue")) } + if len(requests) == 0 { + panic(errors.AssertionFailedf("outOfOrderRequestsProvider enqueuing zero requests")) + } p.requests = requests p.hasWork.Signal() } @@ -388,6 +391,9 @@ func (p *inOrderRequestsProvider) enqueue(requests []singleRangeBatch) { if len(p.requests) > 0 { panic(errors.AssertionFailedf("inOrderRequestsProvider has old requests in enqueue")) } + if len(requests) == 0 { + panic(errors.AssertionFailedf("inOrderRequestsProvider enqueuing zero requests")) + } p.requests = requests p.heapInit() p.hasWork.Signal() From c56dcaa9049eb705adb12b2c5ed2b4445458e0aa Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Wed, 9 Aug 2023 11:39:54 -0400 Subject: [PATCH 07/10] schemachanger: Cleanup redundant tests and refactor it This commit recognizes that there were previously three redundant tests about concurrent schema changer behavior so it deletes them and rewrite it to a simpler one. Release note: None --- pkg/sql/schemachanger/BUILD.bazel | 11 +- pkg/sql/schemachanger/schemachanger_test.go | 885 +++----------------- 2 files changed, 121 insertions(+), 775 deletions(-) diff --git a/pkg/sql/schemachanger/BUILD.bazel b/pkg/sql/schemachanger/BUILD.bazel index 8f8658a425a2..6ca5cc59dc94 100644 --- a/pkg/sql/schemachanger/BUILD.bazel +++ b/pkg/sql/schemachanger/BUILD.bazel @@ -39,39 +39,34 @@ go_test( "//pkg/ccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/kv", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/desctestutils", "//pkg/sql/execinfra", - "//pkg/sql/pgwire/pgcode", "//pkg/sql/rowenc", "//pkg/sql/schemachanger/scexec", "//pkg/sql/schemachanger/scop", "//pkg/sql/schemachanger/scplan", "//pkg/sql/schemachanger/sctest", # keep - "//pkg/sql/sqltestutils", + "//pkg/sql/sessiondatapb", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", - "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", - "//pkg/util/syncutil", - "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//errorspb", - "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index be5838e1acf9..5c4f73ef5003 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -20,798 +20,34 @@ import ( "sync/atomic" "testing" - "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" - "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/errorspb" - "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) -// TestConcurrentDeclarativeSchemaChanges tests that concurrent declarative -// schema changes operating on the same descriptors are performed serially. -func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx, cancel := context.WithCancel(context.Background()) - - var maxValue = 4000 - if util.RaceEnabled { - // Race builds are a lot slower, so use a smaller number of rows. - maxValue = 200 - } - - // Protects backfillNotification. - var mu syncutil.Mutex - var backfillNotification, continueNotification chan struct{} - // We have to have initBackfillNotification return the new - // channel rather than having later users read the original - // backfillNotification to make the race detector happy. - initBackfillNotification := func() (chan struct{}, chan struct{}) { - mu.Lock() - defer mu.Unlock() - backfillNotification = make(chan struct{}) - continueNotification = make(chan struct{}) - return backfillNotification, continueNotification - } - notifyBackfill := func() { - mu.Lock() - defer mu.Unlock() - if backfillNotification != nil { - close(backfillNotification) - backfillNotification = nil - } - if continueNotification != nil { - <-continueNotification - close(continueNotification) - continueNotification = nil - } - } - var alterPrimaryKeyBlockedCounter atomic.Uint32 - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { - for _, stmt := range stmts { - if strings.Contains(stmt, "ALTER PRIMARY KEY") { - alterPrimaryKeyBlockedCounter.Add(1) - return - } - } - }, - }, - DistSQL: &execinfra.TestingKnobs{ - RunBeforeBackfillChunk: func(_ roachpb.Span) error { - notifyBackfill() - return nil - }, - }, - // Decrease the adopt loop interval so that retries happen quickly. - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - // Prevent the GC job from running so we ensure that all the keys - // which were written remain. - GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { - <-ctx.Done() - return ctx.Err() - }}, - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - defer cancel() - codec := s.ApplicationLayer().Codec() - - if _, err := sqlDB.Exec(`CREATE DATABASE t`); err != nil { - t.Fatal(err) - } - - if _, err := sqlDB.Exec(`CREATE TABLE t.test (k INT NOT NULL, v INT)`); err != nil { - t.Fatal(err) - } - if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { - t.Fatal(err) - } - - backfillNotif, continueNotif := initBackfillNotification() - var wg sync.WaitGroup - wg.Add(1) - go func() { - if _, err := sqlDB.Exec(`CREATE INDEX i ON t.test (v)`); err != nil { - t.Error(err) - } - wg.Done() - }() - - // Wait until the create index schema change job has started - // before kicking off the alter primary key. - <-backfillNotif - require.Zero(t, alterPrimaryKeyBlockedCounter.Load()) - wg.Add(1) - go func() { - if _, err := sqlDB.Exec(`ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (k)`); err != nil { - t.Error(err) - } - wg.Done() - }() - - // The ALTER PRIMARY KEY schema change must block. - testutils.SucceedsSoon(t, func() error { - if alterPrimaryKeyBlockedCounter.Load() == 0 { - return errors.New("waiting for concurrent schema change to block") - } - return nil - }) - - // Unblock the create index job. - continueNotif <- struct{}{} - wg.Wait() - - // The ALTER PRIMARY KEY schema change must have been blocked. - require.NotZero(t, alterPrimaryKeyBlockedCounter.Load()) - - // There should be 5 k/v pairs per row: - // * the original primary index keyed on rowid, - // * the first version of the secondary index on v with rowid as key suffix, - // * the intermediate version of the new primary index keyed on k but - // including rowid as a stored column, - // * the final version of the new primary index keyed on k and not including rowid, - // * the final version of the secondary index for v with k as the key suffix. - testutils.SucceedsSoon(t, func() error { - return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 5, maxValue) - }) -} - -func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - t.Run("wait for legacy schema changes", func(t *testing.T) { - // This test starts an legacy schema change job (job 1), and then starts - // another legacy schema change job (job 2) and a declarative schema change - // job (job 3) while job 1 is backfilling. Job 1 is resumed after job 2 - // has started running. - ctx := context.Background() - - var job1Backfill sync.Once - var job2Resume sync.Once - var job3Wait sync.Once - // Closed when we enter the RunBeforeBackfill knob of job 1. - job1BackfillNotification := make(chan struct{}) - // Closed when we're ready to continue with job 1. - job1ContinueNotification := make(chan struct{}) - // Closed when job 2 starts. - job2ResumeNotification := make(chan struct{}) - // Closed when job 3 starts waiting for concurrent schema changes to finish. - job3WaitNotification := make(chan struct{}) - var job1ID jobspb.JobID - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - RunBeforeResume: func(jobID jobspb.JobID) error { - // Only block in job 2. - if job1ID == 0 || jobID == job1ID { - job1ID = jobID - return nil - } - job2Resume.Do(func() { - close(job2ResumeNotification) - }) - return nil - }, - RunBeforeBackfill: func() error { - job1Backfill.Do(func() { - close(job1BackfillNotification) - <-job1ContinueNotification - }) - return nil - }, - }, - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, idx int) error { - // Assert that when job 3 is running, there are no mutations other - // than the ones associated with this schema change. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - // There are 2 schema changes that should precede job 3. - // The declarative schema changer uses the same mutation ID for all - // its mutations. - for _, m := range table.AllMutations() { - assert.Equal(t, int(m.MutationID()), 3) - } - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(_ []string) { - job3Wait.Do(func() { - close(job3WaitNotification) - }) - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - - g := ctxgroup.WithContext(ctx) - - // Start job 1: An index schema change, which does not use the new schema - // changer. - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.ExecContext(ctx, `SET use_declarative_schema_changer='off'`) - assert.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CREATE INDEX idx ON db.t(a)`) - assert.NoError(t, err) - return nil - }) - - <-job1BackfillNotification - - // Start job 3: A column schema change which uses the new schema changer. - // The transaction will not actually commit until job 1 has finished. - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`) - assert.NoError(t, err) - return nil - }) - - <-job3WaitNotification - - // Start job 2: Another index schema change which does not use the new - // schema changer. - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.ExecContext(ctx, `SET use_declarative_schema_changer='off'`) - assert.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CREATE INDEX idx2 ON db.t(a)`) - assert.NoError(t, err) - return nil - }) - - // Wait for job 2 to start. - <-job2ResumeNotification - - // Finally, let job 1 finish, which will unblock the - // others. - close(job1ContinueNotification) - require.NoError(t, g.Wait()) - - // Check that job 3 was created last. - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status, description FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeSchemaChange.String(), string(jobs.StatusSucceeded), `CREATE INDEX idx ON db.public.t (a)`}, - {jobspb.TypeSchemaChange.String(), string(jobs.StatusSucceeded), `CREATE INDEX idx2 ON db.public.t (a)`}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded), `ALTER TABLE db.public.t ADD COLUMN b INT8 DEFAULT 1`}, - }, - ) - }) - - t.Run("wait for declarative schema changes for tables", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) while job 1 is backfilling. - ctx := context.Background() - - var job1Backfill sync.Once - var job2Wait sync.Once - // Closed when we enter the RunBeforeBackfill knob of job 1. - job1BackfillNotification := make(chan struct{}) - // Closed when we're ready to continue with job 1. - job1ContinueNotification := make(chan struct{}) - // Closed when job 2 starts waiting for concurrent schema changes to finish. - job2WaitNotification := make(chan struct{}) - - stmt1 := `ALTER TABLE db.t ADD COLUMN b INT8 DEFAULT 1` - stmt2 := `ALTER TABLE db.t ADD COLUMN c INT8 DEFAULT 2` - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, idx int) error { - // Verify that we never queue mutations for job 2 before finishing job - // 1. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - mutations := table.AllMutations() - if len(mutations) == 0 { - t.Errorf("unexpected empty mutations") - return errors.Errorf("test failure") - } - var idsSeen []descpb.MutationID - for _, m := range mutations { - if len(idsSeen) == 0 || m.MutationID() > idsSeen[len(idsSeen)-1] { - idsSeen = append(idsSeen, m.MutationID()) - } - } - highestID := idsSeen[len(idsSeen)-1] - assert.Truef(t, highestID <= 1, "unexpected mutation IDs %v", idsSeen) - // Block job 1 during the backfill. - s := p.Stages[idx] - stmt := p.TargetState.Statements[0].Statement - if stmt != stmt1 || s.Type() != scop.BackfillType { - return nil - } - for _, op := range s.EdgeOps { - if backfillOp, ok := op.(*scop.BackfillIndex); ok && backfillOp.IndexID == descpb.IndexID(2) { - job1Backfill.Do(func() { - close(job1BackfillNotification) - <-job1ContinueNotification - }) - } - } - - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] != stmt2 { - return - } - job2Wait.Do(func() { - close(job2WaitNotification) - }) - }, - }, - } - - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job1BackfillNotification - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - <-job2WaitNotification - close(job1ContinueNotification) - require.NoError(t, g.Wait()) - - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - }) - - t.Run("wait for declarative schema changes for schema", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) involving dropping schemas. - // Both of these jobs will need to concurrently touch the database descriptor. - ctx := context.Background() - - var jobWaitForPostCommit sync.Once - var jobWaitBeforeWait sync.Once - // Closed when we're ready to continue with job 1. - job2StartExecution := make(chan struct{}) - job2ContinueNotification := make(chan struct{}) - completionCount := int32(0) - - stmt1 := `DROP SCHEMA db.s1` - stmt2 := `DROP SCHEMA db.s2` - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, stageIdx int) error { - if p.Params.ExecutionPhase != scop.PostCommitPhase { - return nil - } - jobWaitForPostCommit.Do(func() { - job2StartExecution <- struct{}{} - job2ContinueNotification <- struct{}{} - }) - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] == stmt2 { - atomic.AddInt32(&completionCount, 1) - jobWaitBeforeWait.Do(func() { - <-job2ContinueNotification - }) - } - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, "SET CLUSTER SETTING sql.schema.force_declarative_statements='+CREATE SCHEMA'") - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE SCHEMA db.s1`) - tdb.Exec(t, `CREATE SCHEMA db.s2`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job2StartExecution - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - require.NoError(t, g.Wait()) - - // Expect two NEW SCHEMA CHANGE jobs for stmt1 and stmt2. - // The CREATE DATABASE and two CREATE SCHEMAs don't create jobs. - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - // We should observe the schema change was tried at least twice. - require.GreaterOrEqual(t, atomic.LoadInt32(&completionCount), int32(1)) - }) - - t.Run("wait for declarative schema changes for type", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) involving dropping tables. - // Both of these jobs will need to concurrently touch type descriptors. - ctx := context.Background() - - var jobWaitForPostCommit sync.Once - var jobWaitBeforeWait sync.Once - // Closed when we're ready to continue with job 1. - job2StartExecution := make(chan struct{}) - job2ContinueNotification := make(chan struct{}) - completionCount := int32(0) - - stmt1 := `DROP TABLE db.t1` - stmt2 := `DROP TABLE db.t2` - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, stageIdx int) error { - if p.Params.ExecutionPhase != scop.PostCommitPhase { - return nil - } - jobWaitForPostCommit.Do(func() { - job2StartExecution <- struct{}{} - job2ContinueNotification <- struct{}{} - }) - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] == stmt2 { - atomic.AddInt32(&completionCount, 1) - jobWaitBeforeWait.Do(func() { - <-job2ContinueNotification - }) - } - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TYPE db.status AS ENUM ('open', 'closed', 'inactive');`) - tdb.Exec(t, `CREATE TABLE db.t1(t db.status)`) - tdb.Exec(t, `CREATE TABLE db.t2(t db.status)`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job2StartExecution - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - require.NoError(t, g.Wait()) - - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - // We should observe the schema change was tried at least twice. - require.GreaterOrEqual(t, atomic.LoadInt32(&completionCount), int32(1)) - }) -} - -// TestConcurrentSchemaChangesWait ensures that when a schema change -// is run concurrently with a declarative schema change, that it waits for -// the declarative schema change to complete before proceeding. -// -// Each concurrent schema change is run both as a single statement, where the -// test expects an automatic retry, and as part of an explicit transaction -// which has returned rows, in order to ensure that an error with the proper -// error code is returned. -func TestConcurrentSchemaChangesWait(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const defaultInitialStmt = `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1` - type concurrentWaitTest struct { - // initial statement run under the declarative schema changer, paused on - // the first post commit phase. - initial string - // concurrent statement run under the legacy schema changer - concurrent string - } - ctx := context.Background() - runConcurrentSchemaChangeCase := func(t *testing.T, stmts concurrentWaitTest, implicit bool) { - defer log.Scope(t).Close(t) - var doOnce sync.Once - // Closed when we enter the BeforeStage knob with a post commit or later - // phase. - beforePostCommitNotification := make(chan struct{}) - // Closed when we're ready to continue with the schema change. - continueNotification := make(chan struct{}) - // Sent on when we're waiting for the initial schema change. - waitingForConcurrent := make(chan struct{}) - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeWaitingForConcurrentSchemaChanges: func(_ []string) { - waitingForConcurrent <- struct{}{} - }, - BeforeStage: func(p scplan.Plan, idx int) error { - // Verify that we never get a mutation ID not associated with the schema - // change that is running. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - for _, m := range table.AllMutations() { - assert.LessOrEqual(t, int(m.MutationID()), 2) - } - s := p.Stages[idx] - if s.Phase < scop.PostCommitPhase { - return nil - } - doOnce.Do(func() { - close(beforePostCommitNotification) - <-continueNotification - }) - return nil - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - initialSchemaChange := func() error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - for _, s := range strings.Split(stmts.initial, ";") { - _, err = conn.ExecContext(ctx, s) - assert.NoError(t, err) - } - return nil - } - concurrentSchemaChangeImplicit := func() error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - for _, s := range append([]string{ - `SET use_declarative_schema_changer = 'off'`, - }, strings.Split(stmts.concurrent, ";")...) { - if _, err = conn.ExecContext(ctx, s); err != nil { - return err - } - } - return nil - } - concurrentSchemaChangeExplicit := func() error { - var sawRestart bool - defer func() { assert.True(t, sawRestart) }() - return crdb.Execute(func() (err error) { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer func() { - if err != nil { - var pqErr *pq.Error - sawRestart = sawRestart || - errors.As(err, &pqErr) && - string(pqErr.Code) == pgcode.SerializationFailure.String() - _ = tx.Rollback() - } - }() - // Execute something first to ensure that a restart is sent. - if _, err := tx.Exec("SELECT * FROM db.other_t"); err != nil { - return err - } - for _, s := range strings.Split(stmts.concurrent, ";") { - if _, err := tx.ExecContext(ctx, s); err != nil { - return err - } - } - return tx.Commit() - }) - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.other_t (a INT PRIMARY KEY)`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - tdb.Exec(t, `CREATE USER testuser`) - tdb.Exec(t, `CREATE SCHEMA db.sc`) - tdb.Exec(t, `ALTER SCHEMA db.sc OWNER to testuser`) - tdb.Exec(t, `CREATE TABLE db.sc.t (a INT PRIMARY KEY)`) - tdb.Exec(t, `ALTER TABLE db.sc.t OWNER to testuser`) - var initialSchemaChangeGroup errgroup.Group - var concurrentSchemaChangeGroup errgroup.Group - initialSchemaChangeGroup.Go(initialSchemaChange) - <-beforePostCommitNotification - if implicit { - concurrentSchemaChangeGroup.Go(concurrentSchemaChangeImplicit) - } else { - concurrentSchemaChangeGroup.Go(concurrentSchemaChangeExplicit) - } - <-waitingForConcurrent - close(continueNotification) - require.NoError(t, initialSchemaChangeGroup.Wait()) - require.NoError(t, concurrentSchemaChangeGroup.Wait()) - } - - stmts := []concurrentWaitTest{ - {defaultInitialStmt, `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`}, - {defaultInitialStmt, `CREATE INDEX ON db.t(a)`}, - {defaultInitialStmt, `ALTER TABLE db.t RENAME COLUMN a TO c`}, - {defaultInitialStmt, `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`}, - {defaultInitialStmt, `CREATE VIEW db.v AS SELECT a FROM db.t`}, - {defaultInitialStmt, `ALTER TABLE db.t RENAME TO db.new`}, - {defaultInitialStmt, `TRUNCATE TABLE db.t`}, - {defaultInitialStmt, `DROP TABLE db.t`}, - {"USE db; DROP OWNED BY testuser;", `DROP DATABASE db`}, - } - for i := range stmts { - stmt := stmts[i] // copy for closure - t.Run(stmt.concurrent, func(t *testing.T) { - testutils.RunTrueAndFalse(t, "implicit", func(t *testing.T, implicit bool) { - runConcurrentSchemaChangeCase(t, stmt, implicit) - }) - }) - } -} - func TestSchemaChangerJobRunningStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1166,3 +402,118 @@ WHERE }) } } + +// TestSchemaChangeWaitsForConcurrentSchemaChanges tests that if a schema +// change on a table is issued when there is already an ongoing schema change +// on that table, it will wait until that ongoing schema change finishes before +// proceeding. +func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tf := func(t *testing.T, modeFor1stStmt, modeFor2ndStmt sessiondatapb.NewSchemaChangerMode) { + ctx, cancel := context.WithCancel(context.Background()) + createIndexChan := make(chan struct{}) + addColChan := make(chan struct{}) + var closeMainChanOnce, closeAlterPKChanOnce sync.Once + + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ + // If the blocked schema changer is from legacy schema changer, we let + // it hijack this knob (which is originally design for declarative + // schema changer) if `stmt` is nil. + WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { + if (len(stmts) == 1 && strings.Contains(stmts[0], "ADD COLUMN")) || + stmts == nil { + closeAlterPKChanOnce.Do(func() { + close(addColChan) + }) + } + }, + }, + DistSQL: &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: func(_ roachpb.Span) error { + closeMainChanOnce.Do(func() { + close(createIndexChan) + }) + <-addColChan // wait for AddCol to unblock me + return nil + }, + }, + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + // Prevent the GC job from running so we ensure that all the keys which + // were written remain. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { + <-ctx.Done() + return ctx.Err() + }}, + } + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + defer cancel() + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL);") + tdb.Exec(t, "INSERT INTO t SELECT k, k+1 FROM generate_series(1,1000) AS tmp(k);") + + // Execute 1st DDL asynchronously and block until it's executing. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor1stStmt.String()) + go func() { + tdb.Exec(t, `CREATE INDEX idx ON t (j);`) + }() + <-createIndexChan + + // Execute 2st DDL synchronously. During waiting, it will unblock 1st DDL so + // it will eventually be able to proceed after waiting for a while. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor2ndStmt.String()) + tdb.Exec(t, `ALTER TABLE t ADD COLUMN k INT DEFAULT 30;`) + + // There should be 2 k/v pairs per row: + // 1. the old primary index (i : j) + // 2. the new secondary index keyed on j with key suffix on i (j; i : ), from CREATE INDEX + // Additionally, if ADD COLUMN uses declarative schema changer, there will + // one 1 more k/v pair for each row: + // 3. the new primary index (i : j, k), from ADD COLUMN + expectedKeyCount := 2000 + if modeFor2ndStmt == sessiondatapb.UseNewSchemaChangerUnsafeAlways { + expectedKeyCount = 3000 + } + requireTableKeyCount(ctx, t, s.ApplicationLayer().Codec(), kvDB, + "defaultdb", "t", expectedKeyCount) + } + + t.Run("declarative-then-declarative", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + + t.Run("declarative-then-legacy", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerOff) + }) + + t.Run("legacy-then-declarative", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + + // legacy + legacy case is tested in TestLegacySchemaChangerWaitsForOtherSchemaChanges + // because the waiting occurred under a different code path. +} + +// requireTableKeyCount ensures that `db`.`tbl` has `keyCount` kv-pairs in it. +func requireTableKeyCount( + ctx context.Context, + t *testing.T, + codec keys.SQLCodec, + kvDB *kv.DB, + db string, + tbl string, + keyCount int, +) { + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, db, tbl) + tablePrefix := codec.TablePrefix(uint32(tableDesc.GetID())) + tableEnd := tablePrefix.PrefixEnd() + kvs, err := kvDB.Scan(ctx, tablePrefix, tableEnd, 0) + require.NoError(t, err) + require.Equal(t, keyCount, len(kvs)) +} From c02d1f38289546f078b43cd322797fe543f39cdf Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Wed, 9 Aug 2023 11:44:55 -0400 Subject: [PATCH 08/10] schemachanger: Add an intergration-style test for concurrent schema changer behavior This commit adds an integration style test for concurrent schema changer behaviors where we run multiple DDLs for an extended period of time on a few descriptors and assert that they all eventually finish and the descriptors end up in the expected state. Release note: None --- pkg/sql/schemachanger/BUILD.bazel | 3 + pkg/sql/schemachanger/schemachanger_test.go | 271 ++++++++++++++++++++ 2 files changed, 274 insertions(+) diff --git a/pkg/sql/schemachanger/BUILD.bazel b/pkg/sql/schemachanger/BUILD.bazel index 6ca5cc59dc94..51fee6bc6acb 100644 --- a/pkg/sql/schemachanger/BUILD.bazel +++ b/pkg/sql/schemachanger/BUILD.bazel @@ -49,6 +49,7 @@ go_test( "//pkg/sql/catalog", "//pkg/sql/catalog/desctestutils", "//pkg/sql/execinfra", + "//pkg/sql/pgwire/pgcode", "//pkg/sql/rowenc", "//pkg/sql/schemachanger/scexec", "//pkg/sql/schemachanger/scop", @@ -57,6 +58,7 @@ go_test( "//pkg/sql/sessiondatapb", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/util/ctxgroup", "//pkg/util/leaktest", @@ -65,6 +67,7 @@ go_test( "//pkg/util/randutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//errorspb", + "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 5c4f73ef5003..1076357befc5 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -14,11 +14,13 @@ import ( "context" "encoding/hex" "fmt" + "math/rand" "regexp" "strings" "sync" "sync/atomic" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -30,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" @@ -37,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/errorspb" + "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -517,3 +522,269 @@ func requireTableKeyCount( require.NoError(t, err) require.Equal(t, keyCount, len(kvs)) } + +// TestConcurrentSchemaChanges is an integration style tests where we issue many +// schema changes concurrently (renames, add/drop columns, and create/drop +// indexes) for a period of time and assert that they all finish eventually and +// we end up with expected names, columns, and indexes. +func TestConcurrentSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderShort(t, "this test is long running (>3 mins).") + skip.UnderStress(t, "test is already integration style and long running") + skip.UnderStressRace(t, "test is already integration style and long running") + skip.UnderRace(t, "the test knowingly has data race and has logic to account for that") + + const testDuration = 3 * time.Minute + const renameDBInterval = 5 * time.Second + const renameSCInterval = 4 * time.Second + const renameTblInterval = 3 * time.Second + const addColInterval = 1 * time.Second + const dropColInterval = 1 * time.Second + const createIdxInterval = 1 * time.Second + const dropIdxInterval = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + dbName, scName, tblName := "testdb", "testsc", "t" + allColToIndexes := make(map[string]map[string]struct{}) // colName -> indexes that uses that column + allColToIndexes["col"] = map[string]struct{}{"t_pkey": {}} + allNonPublicIdxToKeyCols := make(map[string]map[string]struct{}) // indexName -> its key column(s) + tdb.Exec(t, fmt.Sprintf("CREATE DATABASE %v;", dbName)) + tdb.Exec(t, fmt.Sprintf("CREATE SCHEMA %v.%v;", dbName, scName)) + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %v.%v.%v (col INT PRIMARY KEY);", dbName, scName, tblName)) + tdb.Exec(t, fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + + // repeatFnWithInterval repeats `fn` indefinitely every `interval` until + // `ctx` is cancelled. + workerErrChan := make(chan error) + var wg sync.WaitGroup + repeatWorkWithInterval := func(workerName string, workInterval time.Duration, work func() error) { + wg.Add(1) + defer wg.Done() + for { + jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32()) + select { + case <-ctx.Done(): + t.Logf("%v is signaled to finish work", workerName) + return + case <-time.After(jitteredInterval): + if err := work(); err != nil { + t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error()) + workerErrChan <- err + return + } + } + } + } + + // validate performs a few quick validations after all schema changes are finished: + // 1. Database, schema, and table indeed end up with the tracked name. + // 2. Table indeed has the tracked columns. + // 3. Table indeed has the tracked indexes. + codec := s.ApplicationLayer().Codec() + validate := func() { + dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, codec, dbName) + desctestutils.TestingGetSchemaDescriptor(kvDB, codec, dbDesc.GetID(), scName) + tblDesc := desctestutils.TestingGetTableDescriptor(kvDB, codec, dbName, scName, tblName) + require.Equal(t, len(allColToIndexes), len(tblDesc.PublicColumns())) // allColToIndexes does not include `col` + for _, col := range tblDesc.PublicColumns() { + _, ok := allColToIndexes[col.GetName()] + require.True(t, ok, "column %v does not exist in allColToIndexes=%v", col.GetName(), allColToIndexes) + } + require.Equal(t, len(allNonPublicIdxToKeyCols), len(tblDesc.PublicNonPrimaryIndexes())) + for _, idx := range tblDesc.PublicNonPrimaryIndexes() { + _, ok := allNonPublicIdxToKeyCols[idx.GetName()] + require.True(t, ok, "index %v does not exist in allNonPublicIdxToKeyCols=%v", idx.GetName(), allNonPublicIdxToKeyCols) + } + } + + // A goroutine that repeatedly renames database `testdb` randomly. + go repeatWorkWithInterval("rename-db-worker", renameDBInterval, func() error { + newDBName := fmt.Sprintf("testdb_%v", rand.Intn(1000)) + if newDBName == dbName { + return nil + } + if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { + return err + } + dbName = newDBName + t.Logf("RENAME DATABASE TO %v", newDBName) + return nil + }) + + // A goroutine that renames schema `testdb.testsc` randomly. + go repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func() error { + newSCName := fmt.Sprintf("testsc_%v", rand.Intn(1000)) + if scName == newSCName { + return nil + } + _, err := sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + if err == nil { + scName = newSCName + t.Logf("RENAME SCHEMA TO %v", newSCName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase) { + err = nil // mute those errors as they're expected + t.Logf("Parent database is renamed; skipping this schema renaming.") + } + return err + }) + + // A goroutine that renames table `testdb.testsc.t` randomly. + go repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func() error { + newTblName := fmt.Sprintf("t_%v", rand.Intn(1000)) + _, err := sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + if err == nil { + tblName = newTblName + t.Logf("RENAME TABLE TO %v", newTblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName) { + err = nil + t.Logf("Parent database or schema is renamed; skipping this table renaming.") + } + return err + }) + + // A goroutine that adds columns to `testdb.testsc.t` randomly. + go repeatWorkWithInterval("add-column-worker", addColInterval, func() error { + newColName := fmt.Sprintf("col_%v", rand.Intn(1000)) + if _, ok := allColToIndexes[newColName]; ok { + return nil + } + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", dbName, scName, tblName, newColName, rand.Intn(100))) + if err == nil { + allColToIndexes[newColName] = make(map[string]struct{}) + t.Logf("ADD COLUMN %v TO TABLE %v", newColName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this column addition.") + } + return err + }) + + // A goroutine that drops columns from `testdb.testsc.t` randomly. + go repeatWorkWithInterval("drop-column-worker", dropColInterval, func() error { + // Randomly pick a non-PK column to drop. + if len(allColToIndexes) == 1 { + return nil + } + var colName string + for col := range allColToIndexes { + if col != "col" { + colName = col + break + } + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", dbName, scName, tblName, colName)) + if err == nil { + for indexName := range allColToIndexes[colName] { + delete(allNonPublicIdxToKeyCols, indexName) + } + delete(allColToIndexes, colName) + t.Logf("DROP COLUMN %v FROM TABLE %v", colName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this column removal.") + } + return err + }) + + // A goroutine that creates secondary index on a randomly selected column. + go repeatWorkWithInterval("create-index-worker", createIdxInterval, func() error { + newIndexName := fmt.Sprintf("idx_%v", rand.Intn(1000)) + if _, ok := allNonPublicIdxToKeyCols[newIndexName]; ok { + return nil + } + + // Randomly pick a non-PK column to create an index on. + if len(allColToIndexes) == 1 { + return nil + } + var colName string + for col := range allColToIndexes { + if col != "col" { + colName = col + break + } + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", newIndexName, dbName, scName, tblName, colName)) + if err == nil { + allNonPublicIdxToKeyCols[newIndexName] = map[string]struct{}{colName: {}} + allColToIndexes[colName][newIndexName] = struct{}{} + t.Logf("CREATE INDEX %v ON TABLE %v(%v)", newIndexName, tblName, colName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedColumn) { + err = nil + t.Logf("Parent database or schema or table is renamed or column is dropped; skipping this index creation.") + } + return err + }) + + // A goroutine that drops a secondary index randomly. + go repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func() error { + // Randomly pick a non-public index to drop. + if len(allNonPublicIdxToKeyCols) == 0 { + return nil + } + var indexName string + var indexKeyCols map[string]struct{} + for idx, idxCols := range allNonPublicIdxToKeyCols { + indexName = idx + indexKeyCols = idxCols + break + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) + if err == nil { + for indexKeyCol := range indexKeyCols { + delete(allColToIndexes[indexKeyCol], indexName) + } + delete(allNonPublicIdxToKeyCols, indexName) + t.Logf("DROP INDEX %v FROM TABLE %v", indexName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedObject) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this index removal.") + } + return err + }) + + select { + case workerErr := <-workerErrChan: + t.Logf("main: a worker error %q is signaled; Inform all workers to stop.", workerErr.Error()) + cancel() + wg.Wait() + t.Logf("main: all workers have stopped their work; Test Failure!") + t.Fatalf(workerErr.Error()) + case <-time.After(testDuration): + t.Logf("main: time's up! Inform all workers to stop.") + cancel() + wg.Wait() + t.Logf("main: all workers have stopped. Validating descriptors states...") + validate() + t.Logf("main: validation succeeded! Test success!") + } +} + +func isPQErrWithCode(err error, codes ...pgcode.Code) bool { + if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { + for _, code := range codes { + if pgcode.MakeCode(string(pqErr.Code)) == code { + return true + } + } + } + return false +} From 31b24e59c167c28daca424dbfc04419f9cf54cb4 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 10 Aug 2023 14:41:07 +0000 Subject: [PATCH 09/10] roachtest: pin liveness lease to live node in lease prefs test The lease preferences roachtest could occasionally fail, if the liveness leaseholder were on a stopped node. We should address this issue, for now, pin the liveness lease to a live node to prevent flakes. Informs: #108512 Resolves: #108425 Release note: None --- pkg/cmd/roachtest/tests/lease_preferences.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/cmd/roachtest/tests/lease_preferences.go b/pkg/cmd/roachtest/tests/lease_preferences.go index 460d1bd801df..9138dc96b7b9 100644 --- a/pkg/cmd/roachtest/tests/lease_preferences.go +++ b/pkg/cmd/roachtest/tests/lease_preferences.go @@ -252,6 +252,14 @@ func runLeasePreferences( // enforcement. require.NoError(t, WaitForReplication(ctx, t, conn, spec.replFactor)) + // Set a lease preference for the liveness range, to be on n5. This test + // would occasionally fail due to the liveness heartbeat failures, when the + // liveness lease is on a stopped node. This is not ideal behavior, #108512. + configureZone(t, ctx, conn, "RANGE liveness", zoneConfig{ + replicas: spec.replFactor, + leaseNode: 5, + }) + t.L().Printf("setting lease preferences: %s", spec.preferences) setLeasePreferences(ctx, spec.preferences) t.L().Printf("waiting for initial lease preference conformance") From d9573d21c19f87fbdaf54f2e4da106543d2da7c2 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 19 Jul 2023 13:57:32 -0400 Subject: [PATCH 10/10] storage: add method to ingest external files, rename IngestExternalFiles This change renames the existing IngestExternalFiles method on storage.Engine to IngestLocalFiles, and adds a new IngestExternalFiles that ingests pebble.ExternalFile, for use with online restore. Depends on https://github.com/cockroachdb/pebble/pull/2753. Epic: none Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 + .../batcheval/cmd_add_sstable_test.go | 4 +- pkg/kv/kvserver/replica_proposal.go | 45 ++++++++++++------- pkg/kv/kvserver/replica_raftstorage.go | 2 +- pkg/storage/bench_test.go | 2 +- pkg/storage/engine.go | 16 ++++--- pkg/storage/metamorphic/operations.go | 2 +- pkg/storage/mvcc_incremental_iterator_test.go | 2 +- pkg/storage/pebble.go | 22 +++++---- 9 files changed, 61 insertions(+), 35 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index b3dfc8a2ef7e..3eb1a7915007 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -224,6 +224,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", + "@com_github_cockroachdb_pebble//objstorage/remote", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 88501088f439..9b36881486af 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) { } else { require.NotNil(t, result.Replicated.AddSSTable) require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) + require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"})) } var expect kvs @@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { require.NoError(t, err) require.NoError(t, fs.WriteFile(engine, "sst", sst)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) + require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"})) statsEvaled := statsBefore statsEvaled.Add(*cArgs.Stats) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 4cf3df327f2e..c4d5a42fb106 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -37,6 +37,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/redact" "github.com/kr/pretty" "golang.org/x/time/rate" @@ -628,19 +630,32 @@ func addSSTablePreApply( sst.Span, sst.RemoteFileLoc, ) - // TODO(bilal): replace this with the real ingest. - /* - start := storage.EngineKey{Key: sst.Span.Key} - end := storage.EngineKey{Key: sst.Span.EndKey} - - externalFile := pebble.ExternalFile{ - Locator: shared.Locator(sst.RemoteFileLoc), - ObjName: sst.RemoteFilePath, - Size: sst.BackingFileSize, - SmallestUserKey: start.Encode(), - LargestUserKey: end.Encode(), - }*/ - log.Fatalf(ctx, "Unsupported IngestRemoteFile") + start := storage.EngineKey{Key: sst.Span.Key} + end := storage.EngineKey{Key: sst.Span.EndKey} + externalFile := pebble.ExternalFile{ + Locator: remote.Locator(sst.RemoteFileLoc), + ObjName: sst.RemoteFilePath, + Size: sst.BackingFileSize, + SmallestUserKey: start.Encode(), + LargestUserKey: end.Encode(), + } + tBegin := timeutil.Now() + defer func() { + if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() { + log.Infof(ctx, + "ingesting SST of size %s at index %d took %.2fs", + humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(), + ) + } + }() + + _, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile}) + if ingestErr != nil { + log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr) + } + // Adding without modification succeeded, no copy necessary. + log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath) + return false } checksum := util.CRC32(sst.Data) @@ -685,7 +700,7 @@ func addSSTablePreApply( } // Regular path - we made a hard link, so we can ingest the hard link now. - ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath}) + ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath}) if ingestErr != nil { log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr) } @@ -726,7 +741,7 @@ func ingestViaCopy( if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil { return errors.Wrapf(err, "while ingesting %s", ingestPath) } - if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil { + if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil { return errors.Wrapf(err, "while ingesting %s", ingestPath) } log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 4865725f64ea..bd269d450ac4 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -578,7 +578,7 @@ func (r *Replica) applySnapshot( // TODO: separate ingestions for log and statemachine engine. See: // // https://github.com/cockroachdb/cockroach/issues/93251 - r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) } if r.store.cfg.KVAdmissionController != nil { diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 345fec419e00..3fdcd1dcf39b 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -1944,7 +1944,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) { } require.NoError(b, writer.Close()) batch.Close() - require.NoError(b, eng.IngestExternalFiles(ctx, []string{sstFileName})) + require.NoError(b, eng.IngestLocalFiles(ctx, []string{sstFileName})) } for i := 0; i < b.N; i++ { rw := eng.NewReadOnly(StandardDurability) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 97056747c635..8c6b59cdb19c 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1024,18 +1024,22 @@ type Engine interface { NewSnapshot() Reader // Type returns engine type. Type() enginepb.EngineType - // IngestExternalFiles atomically links a slice of files into the RocksDB + // IngestLocalFiles atomically links a slice of files into the RocksDB // log-structured merge-tree. - IngestExternalFiles(ctx context.Context, paths []string) error - // IngestExternalFilesWithStats is a variant of IngestExternalFiles that + IngestLocalFiles(ctx context.Context, paths []string) error + // IngestLocalFilesWithStats is a variant of IngestLocalFiles that // additionally returns ingestion stats. - IngestExternalFilesWithStats( + IngestLocalFilesWithStats( ctx context.Context, paths []string) (pebble.IngestOperationStats, error) - // IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats + // IngestAndExciseFiles is a variant of IngestLocalFilesWithStats // that excises an ExciseSpan, and ingests either local or shared sstables or // both. - IngestAndExciseExternalFiles( + IngestAndExciseFiles( ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error) + // IngestExternalFiles is a variant of IngestLocalFiles that takes external + // files. These files can be referred to by multiple stores, but are not + // modified or deleted by the Engine doing the ingestion. + IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error) // PreIngestDelay offers an engine the chance to backpressure ingestions. // When called, it may choose to block if the engine determines that it is in // or approaching a state where further ingestions may risk its health. diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 9b6e83f1291b..9e10b24d514a 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -779,7 +779,7 @@ func (i ingestOp) run(ctx context.Context) string { } sstWriter.Close() - if err := i.m.engine.IngestExternalFiles(ctx, []string{sstPath}); err != nil { + if err := i.m.engine.IngestLocalFiles(ctx, []string{sstPath}); err != nil { return fmt.Sprintf("error = %s", err.Error()) } diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 7e272f838518..64381b0685d3 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -1371,7 +1371,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { if err := fs.WriteFile(db2, `ingest`, memFile.Data()); err != nil { t.Fatal(err) } - if err := db2.IngestExternalFiles(ctx, []string{`ingest`}); err != nil { + if err := db2.IngestLocalFiles(ctx, []string{`ingest`}); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 2ab380055ba3..3ff79aaf7c55 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -582,8 +582,7 @@ func DefaultPebbleOptions() *pebble.Options { MemTableStopWritesThreshold: 4, Merger: MVCCMerger, BlockPropertyCollectors: PebbleBlockPropertyCollectors, - // Minimum supported format. - FormatMajorVersion: MinimumSupportedFormatVersion, + FormatMajorVersion: MinimumSupportedFormatVersion, } // Automatically flush 10s after the first range tombstone is added to a // memtable. This ensures that we can reclaim space even when there's no @@ -2041,20 +2040,20 @@ func (p *Pebble) Type() enginepb.EngineType { return enginepb.EngineTypePebble } -// IngestExternalFiles implements the Engine interface. -func (p *Pebble) IngestExternalFiles(ctx context.Context, paths []string) error { +// IngestLocalFiles implements the Engine interface. +func (p *Pebble) IngestLocalFiles(ctx context.Context, paths []string) error { return p.db.Ingest(paths) } -// IngestExternalFilesWithStats implements the Engine interface. -func (p *Pebble) IngestExternalFilesWithStats( +// IngestLocalFilesWithStats implements the Engine interface. +func (p *Pebble) IngestLocalFilesWithStats( ctx context.Context, paths []string, ) (pebble.IngestOperationStats, error) { return p.db.IngestWithStats(paths) } -// IngestAndExciseExternalFiles implements the Engine interface. -func (p *Pebble) IngestAndExciseExternalFiles( +// IngestAndExciseFiles implements the Engine interface. +func (p *Pebble) IngestAndExciseFiles( ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span, ) (pebble.IngestOperationStats, error) { rawSpan := pebble.KeyRange{ @@ -2064,6 +2063,13 @@ func (p *Pebble) IngestAndExciseExternalFiles( return p.db.IngestAndExcise(paths, shared, rawSpan) } +// IngestExternalFiles implements the Engine interface. +func (p *Pebble) IngestExternalFiles( + ctx context.Context, external []pebble.ExternalFile, +) (pebble.IngestOperationStats, error) { + return p.db.IngestExternalFiles(external) +} + // PreIngestDelay implements the Engine interface. func (p *Pebble) PreIngestDelay(ctx context.Context) { preIngestDelay(ctx, p, p.settings)