Skip to content

Commit

Permalink
storage: clean up preemptive snapshot when receiving replica ID as le…
Browse files Browse the repository at this point in the history
…arner

This commit adds an annotation to raft request messages to
indicate that the sender believes the current replica is a
learner. If the current replica on the recipient was created as a
preemptive snapshot (it's initialized but not in the range) then
we should remove that replica immediately.

Release Justification: Release blocker, required for backwards compatibility.

Release note: None
  • Loading branch information
ajwerner committed Sep 25, 2019
1 parent 7df2a12 commit 17511f9
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 154 deletions.
101 changes: 100 additions & 1 deletion pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -30,13 +31,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -5064,7 +5067,6 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
mtc.waitForValues(keyB, []int64{0, curB, curB})
}

// Restart store 0 so that it forgets about the newer replicaID.
mtc.stopStore(0)
mtc.restartStore(0)

Expand All @@ -5088,3 +5090,100 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
mtc.waitForValues(keyB, []int64{curB, curB, curB})
})
}

// TestUpgradeFromPreemptiveSnapshot exercises scenarios where a store has
// received a preemptive snapshot for a range and then later is added to the
// range as a learner.
//
// In these cases it is critical that the store destroy the replica created by
// the preemptive snapshot otherwise the replica may try to catch up across
// commands which are not safe (namely merges).
// Before learner replicas we would not add a store to a range until we had
// successfully sent a preemptive snapshot that contained the range descriptor
// used in the change replicas request.
func TestUpgradeFromPreemptiveSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
var proposalFilter atomic.Value
noopProposalFilter := storagebase.ReplicaProposalFilter(func(storagebase.ProposalFilterArgs) *roachpb.Error {
return nil
})
blockAllProposalFilter := storagebase.ReplicaProposalFilter(func(args storagebase.ProposalFilterArgs) *roachpb.Error {
return roachpb.NewError(errors.Errorf("boom"))
})
setupTestCluster := func(t *testing.T) *testcluster.TestCluster {
proposalFilter.Store(noopProposalFilter)
tcArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
BootstrapVersion: &cluster.ClusterVersion{
Version: cluster.VersionByKey(cluster.Version19_1),
},
DisableLoadBasedSplitting: true,
DisableMergeQueue: true,
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
return proposalFilter.Load().(storagebase.ReplicaProposalFilter)(args)
},
},
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
},
},
},
}
return testcluster.StartTestCluster(t, 4, tcArgs)
}
t.Run("add after preemptive snapshot before merge", func(t *testing.T) {
tc := setupTestCluster(t)
defer tc.Stopper().Stop(ctx)
scratchStartKey := tc.ScratchRange(t)
keyA := append(scratchStartKey[:len(scratchStartKey):len(scratchStartKey)], 'a')

// Set up a scratch range on n1, n2, and n3.
tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1), tc.Target(2))
// Split that range.
err := tc.Server(0).DB().AdminSplit(ctx, scratchStartKey, keyA, hlc.Timestamp{})
require.NoError(t, err)
require.NoError(t, tc.WaitForSplitAndInitialization(keyA))
require.NoError(t, tc.WaitForVoters(scratchStartKey, tc.Target(1), tc.Target(2)))
require.NoError(t, tc.WaitForVoters(keyA, tc.Target(1), tc.Target(2)))
// Prevent n4 from successfully being added by blocking the proposal.
proposalFilter.Store(blockAllProposalFilter)
// Issue the add to send a preemptive snapshot.
_, err = tc.AddReplicas(scratchStartKey, tc.Target(3))
// Ensure that the add failed with our proposal filter error.
require.True(t, testutils.IsError(err, "boom"), err)

// Reset the filter.
proposalFilter.Store(noopProposalFilter)
// Merge the range back together.
err = tc.Server(0).DB().AdminMerge(ctx, scratchStartKey)
require.NoError(t, err)
// Upgrade the cluster to use learners.
_, err = tc.ServerConn(0).
Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version();")
require.NoError(t, err)
// We need to ensure that all nodes have received the cluster setting change.

// Successfully add node 4, this used to fail as node 4 would attempt to
// catch up from the preemptive snapshot across the merge and would not
// accept the preemptive snapshot if we have not heard about the cluster
// setting yet.
var desc roachpb.RangeDescriptor
// Use a SucceedsSoon to deal with propagating the cluster setting.
testutils.SucceedsSoon(t, func() (err error) {
desc, err = tc.AddReplicas(scratchStartKey, tc.Target(3))
if err != nil &&
!strings.Contains(err.Error(), "remote couldn't accept PREEMPTIVE snapshot") {
t.Fatal(err)
}
return err
})
// Transfer the lease to node 4 and remove node 0 to ensure that it really
// is a part of the range.
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(3)))
tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(0))
})
}
Loading

0 comments on commit 17511f9

Please sign in to comment.