diff --git a/pkg/kv/kvserver/client_migration_test.go b/pkg/kv/kvserver/client_migration_test.go index a491eb5aa619..59d658dbfc72 100644 --- a/pkg/kv/kvserver/client_migration_test.go +++ b/pkg/kv/kvserver/client_migration_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "fmt" "strings" "sync" "testing" @@ -41,69 +42,86 @@ import ( func TestStorePurgeOutdatedReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - const numStores = 3 - ctx := context.Background() - migrationVersion := roachpb.Version{Major: 42} - tc := testcluster.StartTestCluster(t, numStores, - base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - DisableEagerReplicaRemoval: true, - DisableReplicaGCQueue: true, + // The two sub-tests correspond to whether or not all replicas in the system + // come with a replica version installed. Replica versions were only + // introduced in the 21.1 cycle; replicas instantiated pre-21.1 have this + // unset. We'll want to test that PurgeOutdatedReplicas, when invoked, also + // clears out replicas without a version attached. These replicas + // are necessarily "outdated", else they'd be migrated into using replica + // versions (see clusterversion.ReplicaVersions). + for _, withInitialVersion := range []bool{true, false} { + t.Run(fmt.Sprintf("with-initial-version=%t", withInitialVersion), func(t *testing.T) { + const numStores = 3 + ctx := context.Background() + migrationVersion := roachpb.Version{Major: 42} + + storeKnobs := &kvserver.StoreTestingKnobs{ + DisableEagerReplicaRemoval: true, + DisableReplicaGCQueue: true, + } + if !withInitialVersion { + storeKnobs.InitialReplicaVersionOverride = &roachpb.Version{} + } + + tc := testcluster.StartTestCluster(t, numStores, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: storeKnobs, + }, }, }, - }, - }, - ) - defer tc.Stopper().Stop(context.Background()) - - // Create our scratch range and replicate it to n2 and n3. - n1, n2, n3 := 0, 1, 2 - k := tc.ScratchRange(t) - tc.AddVotersOrFatal(t, k, tc.Target(n2), tc.Target(n3)) - require.NoError(t, tc.WaitForVoters(k, tc.Target(n2), tc.Target(n3))) - - for _, node := range []int{n2, n3} { - ts := tc.Servers[node] - store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) - if pErr != nil { - t.Fatal(pErr) - } - - require.NotNil(t, store.LookupReplica(roachpb.RKey(k))) - } - - // Mark the replica on n2 as eligible for GC. - desc := tc.RemoveVotersOrFatal(t, k, tc.Target(n2)) - - // We register an interceptor seeing as how we're attempting a (dummy) below - // raft migration below. - unregister := batcheval.TestingRegisterMigrationInterceptor(migrationVersion, func() {}) - defer unregister() - - // Migrate the remaining replicas on n1 and n3. - if err := tc.Server(n1).DB().Migrate(ctx, desc.StartKey, desc.EndKey, migrationVersion); err != nil { - t.Fatal(err) - } - - ts := tc.Servers[n2] - store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) - if pErr != nil { - t.Fatal(pErr) - } - - // Check to see that the replica still exists on n2. - require.NotNil(t, store.LookupReplica(roachpb.RKey(k))) - - if err := store.PurgeOutdatedReplicas(ctx, migrationVersion); err != nil { - t.Fatal(err) + ) + defer tc.Stopper().Stop(context.Background()) + + // Create our scratch range and replicate it to n2 and n3. + n1, n2, n3 := 0, 1, 2 + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Target(n2), tc.Target(n3)) + require.NoError(t, tc.WaitForVoters(k, tc.Target(n2), tc.Target(n3))) + + for _, node := range []int{n2, n3} { + ts := tc.Servers[node] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + + require.NotNil(t, store.LookupReplica(roachpb.RKey(k))) + } + + // Mark the replica on n2 as eligible for GC. + desc := tc.RemoveVotersOrFatal(t, k, tc.Target(n2)) + + // We register an interceptor seeing as how we're attempting a (dummy) below + // raft migration below. + unregister := batcheval.TestingRegisterMigrationInterceptor(migrationVersion, func() {}) + defer unregister() + + // Migrate the remaining replicas on n1 and n3. + if err := tc.Server(n1).DB().Migrate(ctx, desc.StartKey, desc.EndKey, migrationVersion); err != nil { + t.Fatal(err) + } + + ts := tc.Servers[n2] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + + // Check to see that the replica still exists on n2. + require.NotNil(t, store.LookupReplica(roachpb.RKey(k))) + + if err := store.PurgeOutdatedReplicas(ctx, migrationVersion); err != nil { + t.Fatal(err) + } + + // Check to see that the replica was purged from n2. + require.Nil(t, store.LookupReplica(roachpb.RKey(k))) + }) } - - // Check to see that the replica was purged from n2. - require.Nil(t, store.LookupReplica(roachpb.RKey(k))) } // TestMigrateWithInflightSnapshot checks to see that the Migrate command blocks diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 054c139a1c92..179e6e3f38a8 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -848,7 +848,22 @@ func (r *Replica) GetGCThreshold() hlc.Timestamp { // Version returns the replica version. func (r *Replica) Version() roachpb.Version { if r.mu.state.Version == nil { - // TODO(irfansharif,tbg): This is a stop-gap for #58523. + // We introduced replica versions in v21.1 to service long-running + // migrations. For replicas that were instantiated pre-21.1, it's + // possible that the replica version is unset (but not for too long!). + // + // In the 21.1 cycle we introduced below-raft migrations that install a + // replica version on all replicas currently part of a raft group. What + // the migrations don't (directly) do is ensure that the versions are + // also installed on replicas slated to be GC-ed soon. For that purpose + // the migrations infrastructure makes use of PurgeOutdatedReplicas. + // + // All that is to say that in 21.1, it's possible we're dealing with + // unset replica versions. + // + // TODO(irfansharif): Remove this in 21.2; we'll have migrated into 21.1 + // and purged all outdated replicas by then, and thus guaranteed to + // always have replica versions. return roachpb.Version{} } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 32ac01b18645..2eae1f2ee5de 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2811,12 +2811,11 @@ func (s *Store) PurgeOutdatedReplicas(ctx context.Context, version roachpb.Versi qp := quotapool.NewIntPool("purge-outdated-replicas", 50) g := ctxgroup.WithContext(ctx) s.VisitReplicas(func(repl *Replica) (wantMore bool) { - if (repl.Version() == roachpb.Version{}) { - // TODO(irfansharif,tbg): This is a stop gap for #58523. - return true - } if !repl.Version().Less(version) { - // Nothing to do here. + // Nothing to do here. The less-than check also considers replicas + // with unset replica versions, which are only possible if they're + // left-over, GC-able replicas from before the first below-raft + // migration. We'll want to purge those. return true } diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 8111a7052e93..952050aecf8d 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -143,6 +143,10 @@ func WriteInitialClusterData( return r } + initialReplicaVersion := bootstrapVersion + if knobs.InitialReplicaVersionOverride != nil { + initialReplicaVersion = *knobs.InitialReplicaVersionOverride + } // We iterate through the ranges backwards, since they all need to contribute // to the stats of the first range (i.e. because they all write meta2 records // in the first range), and so we want to create the first range last so that @@ -236,12 +240,12 @@ func WriteInitialClusterData( if tt := knobs.TruncatedStateTypeOverride; tt != nil { if err := stateloader.WriteInitialRangeStateWithTruncatedState( - ctx, batch, *desc, bootstrapVersion, *tt, + ctx, batch, *desc, initialReplicaVersion, *tt, ); err != nil { return err } } else { - if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, bootstrapVersion); err != nil { + if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil { return err } } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 33fc98d4acc6..ed647ae043a1 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -260,6 +260,10 @@ type StoreTestingKnobs struct { // If set, use the given truncated state type when bootstrapping ranges. // This is used for testing the truncated state migration. TruncatedStateTypeOverride *stateloader.TruncatedStateType + // If set, use the given version as the initial replica version when + // bootstrapping ranges. This is used for testing the migration + // infrastructure. + InitialReplicaVersionOverride *roachpb.Version // GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the last // gossiped store capacity values which need be exceeded before the store will // gossip immediately without waiting for the periodic gossip interval.