Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: purge gc-able, unmigrated replicas during migrations #62838

Merged
merged 1 commit into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 77 additions & 59 deletions pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver_test

import (
"context"
"fmt"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -41,69 +42,86 @@ import (
func TestStorePurgeOutdatedReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numStores = 3

ctx := context.Background()
migrationVersion := roachpb.Version{Major: 42}
tc := testcluster.StartTestCluster(t, numStores,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableEagerReplicaRemoval: true,
DisableReplicaGCQueue: true,
// The two sub-tests correspond to whether or not all replicas in the system
// come with a replica version installed. Replica versions were only
// introduced in the 21.1 cycle; replicas instantiated pre-21.1 have this
// unset. We'll want to test that PurgeOutdatedReplicas, when invoked, also
// clears out replicas without a version attached. These replicas
// are necessarily "outdated", else they'd be migrated into using replica
// versions (see clusterversion.ReplicaVersions).
for _, withInitialVersion := range []bool{true, false} {
t.Run(fmt.Sprintf("with-initial-version=%t", withInitialVersion), func(t *testing.T) {
const numStores = 3
ctx := context.Background()
migrationVersion := roachpb.Version{Major: 42}

storeKnobs := &kvserver.StoreTestingKnobs{
DisableEagerReplicaRemoval: true,
DisableReplicaGCQueue: true,
}
if !withInitialVersion {
storeKnobs.InitialReplicaVersionOverride = &roachpb.Version{}
}

tc := testcluster.StartTestCluster(t, numStores,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: storeKnobs,
},
},
},
},
},
)
defer tc.Stopper().Stop(context.Background())

// Create our scratch range and replicate it to n2 and n3.
n1, n2, n3 := 0, 1, 2
k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Target(n2), tc.Target(n3))
require.NoError(t, tc.WaitForVoters(k, tc.Target(n2), tc.Target(n3)))

for _, node := range []int{n2, n3} {
ts := tc.Servers[node]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}

require.NotNil(t, store.LookupReplica(roachpb.RKey(k)))
}

// Mark the replica on n2 as eligible for GC.
desc := tc.RemoveVotersOrFatal(t, k, tc.Target(n2))

// We register an interceptor seeing as how we're attempting a (dummy) below
// raft migration below.
unregister := batcheval.TestingRegisterMigrationInterceptor(migrationVersion, func() {})
defer unregister()

// Migrate the remaining replicas on n1 and n3.
if err := tc.Server(n1).DB().Migrate(ctx, desc.StartKey, desc.EndKey, migrationVersion); err != nil {
t.Fatal(err)
}

ts := tc.Servers[n2]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}

// Check to see that the replica still exists on n2.
require.NotNil(t, store.LookupReplica(roachpb.RKey(k)))

if err := store.PurgeOutdatedReplicas(ctx, migrationVersion); err != nil {
t.Fatal(err)
)
defer tc.Stopper().Stop(context.Background())

// Create our scratch range and replicate it to n2 and n3.
n1, n2, n3 := 0, 1, 2
k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Target(n2), tc.Target(n3))
require.NoError(t, tc.WaitForVoters(k, tc.Target(n2), tc.Target(n3)))

for _, node := range []int{n2, n3} {
ts := tc.Servers[node]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}

require.NotNil(t, store.LookupReplica(roachpb.RKey(k)))
}

// Mark the replica on n2 as eligible for GC.
desc := tc.RemoveVotersOrFatal(t, k, tc.Target(n2))

// We register an interceptor seeing as how we're attempting a (dummy) below
// raft migration below.
unregister := batcheval.TestingRegisterMigrationInterceptor(migrationVersion, func() {})
defer unregister()

// Migrate the remaining replicas on n1 and n3.
if err := tc.Server(n1).DB().Migrate(ctx, desc.StartKey, desc.EndKey, migrationVersion); err != nil {
t.Fatal(err)
}

ts := tc.Servers[n2]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}

// Check to see that the replica still exists on n2.
require.NotNil(t, store.LookupReplica(roachpb.RKey(k)))

if err := store.PurgeOutdatedReplicas(ctx, migrationVersion); err != nil {
t.Fatal(err)
}

// Check to see that the replica was purged from n2.
require.Nil(t, store.LookupReplica(roachpb.RKey(k)))
})
}

// Check to see that the replica was purged from n2.
require.Nil(t, store.LookupReplica(roachpb.RKey(k)))
}

// TestMigrateWithInflightSnapshot checks to see that the Migrate command blocks
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,22 @@ func (r *Replica) GetGCThreshold() hlc.Timestamp {
// Version returns the replica version.
func (r *Replica) Version() roachpb.Version {
if r.mu.state.Version == nil {
// TODO(irfansharif,tbg): This is a stop-gap for #58523.
// We introduced replica versions in v21.1 to service long-running
// migrations. For replicas that were instantiated pre-21.1, it's
// possible that the replica version is unset (but not for too long!).
//
// In the 21.1 cycle we introduced below-raft migrations that install a
// replica version on all replicas currently part of a raft group. What
// the migrations don't (directly) do is ensure that the versions are
// also installed on replicas slated to be GC-ed soon. For that purpose
// the migrations infrastructure makes use of PurgeOutdatedReplicas.
//
// All that is to say that in 21.1, it's possible we're dealing with
// unset replica versions.
//
// TODO(irfansharif): Remove this in 21.2; we'll have migrated into 21.1
// and purged all outdated replicas by then, and thus guaranteed to
// always have replica versions.
return roachpb.Version{}
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2811,12 +2811,11 @@ func (s *Store) PurgeOutdatedReplicas(ctx context.Context, version roachpb.Versi
qp := quotapool.NewIntPool("purge-outdated-replicas", 50)
g := ctxgroup.WithContext(ctx)
s.VisitReplicas(func(repl *Replica) (wantMore bool) {
if (repl.Version() == roachpb.Version{}) {
// TODO(irfansharif,tbg): This is a stop gap for #58523.
return true
}
if !repl.Version().Less(version) {
// Nothing to do here.
// Nothing to do here. The less-than check also considers replicas
// with unset replica versions, which are only possible if they're
// left-over, GC-able replicas from before the first below-raft
// migration. We'll want to purge those.
return true
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func WriteInitialClusterData(
return r
}

initialReplicaVersion := bootstrapVersion
if knobs.InitialReplicaVersionOverride != nil {
initialReplicaVersion = *knobs.InitialReplicaVersionOverride
}
// We iterate through the ranges backwards, since they all need to contribute
// to the stats of the first range (i.e. because they all write meta2 records
// in the first range), and so we want to create the first range last so that
Expand Down Expand Up @@ -236,12 +240,12 @@ func WriteInitialClusterData(

if tt := knobs.TruncatedStateTypeOverride; tt != nil {
if err := stateloader.WriteInitialRangeStateWithTruncatedState(
ctx, batch, *desc, bootstrapVersion, *tt,
ctx, batch, *desc, initialReplicaVersion, *tt,
); err != nil {
return err
}
} else {
if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, bootstrapVersion); err != nil {
if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil {
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ type StoreTestingKnobs struct {
// If set, use the given truncated state type when bootstrapping ranges.
// This is used for testing the truncated state migration.
TruncatedStateTypeOverride *stateloader.TruncatedStateType
// If set, use the given version as the initial replica version when
// bootstrapping ranges. This is used for testing the migration
// infrastructure.
InitialReplicaVersionOverride *roachpb.Version
// GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the last
// gossiped store capacity values which need be exceeded before the store will
// gossip immediately without waiting for the periodic gossip interval.
Expand Down