Skip to content

Commit

Permalink
server: allow healing ranges whose replicas are all lost
Browse files Browse the repository at this point in the history
Introduces and implements an `ResetQuorumRequest` RPC. `ResetQuorumRequest`
takes in the range id of a range that that is unavailable due to lost quorum
and makes it available again, at the cost of losing all of the data in that
range. Any existing replica, even one residing on the target node, will
irrevocably be removed. ResetQuorumRequest first uses meta2 to identify the
range descriptor. Then, it removes all replicas from the range descriptor and
adds a store from the target node as the one designated survivor replica. This
change is then written to meta2 and sent as a snapshot to a store local to the
target node in order to use crdb internal upreplication and rebalancing
mechanisms to create further replicas from this fresh snapshot.

This RPC is meant to be called by the user directly. It will not work for
ranges that have not lost quorum or for a meta range.

Release note: None
  • Loading branch information
TheSamHuang committed Dec 2, 2020
1 parent 4e28ca3 commit c3c4c66
Show file tree
Hide file tree
Showing 12 changed files with 1,398 additions and 642 deletions.
6 changes: 6 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (m *mockServer) GossipSubscription(
return m.gossipSubFn(req, stream)
}

func (*mockServer) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

func (*mockServer) Batch(context.Context, *roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
panic("unimplemented")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func (n Node) Join(context.Context, *roachpb.JoinNodeRequest) (*roachpb.JoinNode
panic("unimplemented")
}

func (n Node) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ type mockInternalClient struct {

var _ roachpb.InternalClient = &mockInternalClient{}

func (*mockInternalClient) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest, ...grpc.CallOption,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

// Batch is part of the roachpb.InternalClient interface.
func (m *mockInternalClient) Batch(
ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_test(
"replica_test.go",
"replicate_queue_test.go",
"replicate_test.go",
"reset_quorum_test.go",
"scanner_test.go",
"scheduler_test.go",
"single_key_test.go",
Expand Down
156 changes: 156 additions & 0 deletions pkg/kv/kvserver/reset_quorum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestResetQuorum tests the ResetQuorum method to restore quorum for a
// range whose replicas are all lost. It starts a cluster with two nodes,
// n1 and n2, with a range isolated to n2. Then, it stops n2 and checks
// that the range is unavailable. Finally, it uses ResetQuorum to make the
// range accessible again and checks that the range is available again.
//
// TODO(thesamhuang): Add additional testing to cover two cases:
// 1. if there's an existing replica and we send the recovery snapshot to
// that replica, we still get the quorum reset and end up with an empty
// range.
// 2. if there's an existing replica and we send the recovery snapshot to
// another node that doesn't have that replica, we also still get the
// reset, and replicaGC removes the original survivor when triggered.
func TestResetQuorum(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

livenessDuration := 3000 * time.Millisecond

clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
LivenessDuration: livenessDuration, // set duration so n2 liveness expires shortly after stopping
RenewalDuration: 1500 * time.Millisecond,
},
},
},
}
tc := testcluster.StartTestCluster(t, 2, clusterArgs)
defer tc.Stopper().Stop(ctx)

n1, n2 := 0, 1

// Set up a scratch range isolated to n2.
k := tc.ScratchRange(t)
desc, err := tc.AddVoters(k, tc.Target(n2))
require.NoError(t, err)
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(n2)))
desc, err = tc.RemoveVoters(k, tc.Target(n1))
require.NoError(t, err)
require.Len(t, desc.Replicas().All(), 1)

srv := tc.Server(n1)

require.NoError(t, srv.DB().Put(ctx, k, "bar"))

metaKey := keys.RangeMetaKey(desc.EndKey).AsRawKey()
// Read the meta2 which ResetQuorum will be updating while n2
// is still available. This avoids an intent from the previous
// replication change to linger; such an intent would be anchored
// on the scratch range, which is unavailable when n2 goes down.
_, err = srv.DB().Get(ctx, metaKey)
require.NoError(t, err)
tc.StopServer(n2)
// Wait for n2 liveness to expire.
time.Sleep(livenessDuration)

// Sanity check that requests to the ScratchRange fail.
func() {
cCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel()
err := srv.DB().Put(cCtx, k, "baz")
// NB: we don't assert on the exact error since our RPC layer
// tries to return a better error than DeadlineExceeded (at
// the time of writing, we get a connection failure to n2),
// and we don't wait for the context to even time out.
// We're probably checking in the RPC layer whether we've
// retried with an up-to-date desc and fail fast if so.
require.Error(t, err)
}()

// Get the store on the designated survivor n1.
var store *kvserver.Store
require.NoError(t, srv.GetStores().(*kvserver.Stores).VisitStores(func(inner *kvserver.Store) error {
store = inner
return nil
}))
if store == nil {
t.Fatal("no store found on n1")
}

// Call ResetQuorum to reset quorum on the unhealthy range.
t.Logf("resetting quorum on node id: %v, store id: %v", store.NodeID(), store.StoreID())
_, err = srv.Node().(*server.Node).ResetQuorum(
ctx,
&roachpb.ResetQuorumRequest{
RangeID: int32(desc.RangeID),
},
)
require.NoError(t, err)

t.Logf("resetting quorum complete")

require.NoError(t, srv.DB().Put(ctx, k, "baz"))

// Get range descriptor from meta2 and verify it was updated correctly.
var updatedDesc roachpb.RangeDescriptor
require.NoError(t, store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
kvs, err := sql.ScanMetaKVs(ctx, txn, roachpb.Span{
Key: roachpb.KeyMin,
EndKey: roachpb.KeyMax,
})
if err != nil {
return err
}

for i := range kvs {
if err := kvs[i].Value.GetProto(&updatedDesc); err != nil {
return err
}
if updatedDesc.RangeID == desc.RangeID {
return nil
}
}
return errors.Errorf("range id %v not found after resetting quorum", desc.RangeID)
}))
if len(updatedDesc.Replicas().All()) != 1 {
t.Fatalf("found %v replicas found after resetting quorum, expected 1", len(updatedDesc.Replicas().All()))
}
if updatedDesc.Replicas().All()[0].NodeID != srv.NodeID() {
t.Fatalf("replica found after resetting quorum is on node id %v, expected node id %v", updatedDesc.Replicas().All()[0].NodeID, srv.NodeID())
}

}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,30 @@ func (s *Store) processRaftSnapshotRequest(
}
}()
}

if snapHeader.RaftMessageRequest.Message.From == snapHeader.RaftMessageRequest.Message.To {
// This is a special case exercised during recovery from loss of quorum.
// In this case, a forged snapshot will be sent to the replica and will
// hit this code path (if we make up a non-existent follower, Raft will
// drop the message, hence we are forced to make the receiver the sender).
//
// Unfortunately, at the time of writing, Raft assumes that a snapshot
// is always received from the leader (of the given term), which plays
// poorly with these forged snapshots. However, a zero sender works just
// fine as the value zero represents "no known leader".
//
// We prefer not to introduce a zero origin of the message as throughout
// our code we rely on it being present. Instead, we reset the origin
// that raft looks at just before handing the message off.
snapHeader.RaftMessageRequest.Message.From = 0
}
// NB: we cannot get errRemoved here because we're promised by
// withReplicaForRequest that this replica is not currently being removed
// and we've been holding the raftMu the entire time.
if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil {
return roachpb.NewError(err)
}

_, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap)
maybeFatalOnRaftReadyErr(ctx, expl, err)
removePlaceholder = false
Expand Down
136 changes: 136 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
enginepb "github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -30,6 +35,7 @@ import (
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -898,6 +904,136 @@ func (e *errMustRetrySnapshotDueToTruncation) Error() string {
)
}

// SendEmptySnapshot creates an OutgoingSnapshot for the input range
// descriptor and seeds it with an empty range. Then, it sends this
// snapshot to the replica specified in the input.
func SendEmptySnapshot(
ctx context.Context,
st *cluster.Settings,
cc *grpc.ClientConn,
now hlc.Timestamp,
desc roachpb.RangeDescriptor,
to roachpb.ReplicaDescriptor,
) error {
// Create an engine to use as a buffer for the empty snapshot.
eng := storage.NewDefaultInMem()
defer eng.Close()

var ms enginepb.MVCCStats
// Seed an empty range into the new engine.
if err := storage.MVCCPutProto(
ctx, eng, &ms, keys.RangeDescriptorKey(desc.StartKey), now, nil /* txn */, &desc,
); err != nil {
return err
}
ms, err := stateloader.WriteInitialReplicaState(
ctx,
eng,
ms,
desc,
roachpb.Lease{},
hlc.Timestamp{}, // gcThreshold
stateloader.TruncatedStateUnreplicated,
)
if err != nil {
return err
}

// Use stateloader to load state out of memory from the previously created engine.
sl := stateloader.Make(desc.RangeID)
state, err := sl.Load(ctx, eng, &desc)
if err != nil {
return err
}
hs, err := sl.LoadHardState(ctx, eng)
if err != nil {
return err
}

snapUUID, err := uuid.NewV4()
if err != nil {
return err
}

// Create an OutgoingSnapshot to send.
outgoingSnap, err := snapshot(
ctx,
snapUUID,
sl,
// TODO(tbg): We may want a separate SnapshotRequest type
// for recovery that always goes through by bypassing all throttling
// so they cannot be declined. We don't want our operation to be held
// up behind a long running snapshot. We want this to go through
// quickly.
SnapshotRequest_VIA_SNAPSHOT_QUEUE,
eng,
desc.RangeID,
raftentry.NewCache(1), // cache is not used
func(func(SideloadStorage) error) error { return nil }, // this is used for sstables, not needed here as there are no logs
desc.StartKey,
)
if err != nil {
return err
}
defer outgoingSnap.Close()

// From and to replica descriptors are the same because we have
// to send the snapshot from a member of the range descriptor.
// Sending it from the current replica ensures that. Otherwise,
// it would be a malformed request if it came from a non-member.
from := to
req := RaftMessageRequest{
RangeID: desc.RangeID,
FromReplica: from,
ToReplica: to,
Message: raftpb.Message{
Type: raftpb.MsgSnap,
To: uint64(to.ReplicaID),
From: uint64(from.ReplicaID),
Term: hs.Term,
Snapshot: outgoingSnap.RaftSnap,
},
}

header := SnapshotRequest_Header{
State: state,
RaftMessageRequest: req,
RangeSize: ms.Total(),
CanDecline: false,
Priority: SnapshotRequest_RECOVERY,
Strategy: SnapshotRequest_KV_BATCH,
Type: SnapshotRequest_VIA_SNAPSHOT_QUEUE,
UnreplicatedTruncatedState: true,
}

stream, err := NewMultiRaftClient(cc).RaftSnapshot(ctx)
if err != nil {
return err
}

defer func() {
if err := stream.CloseSend(); err != nil {
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()

return sendSnapshot(
ctx,
st,
stream,
noopStorePool{},
header,
&outgoingSnap,
eng.NewBatch,
func() {},
)
}

// noopStorePool is a hollowed out StorePool that does not throttle. It's used in recovery scenarios.
type noopStorePool struct{}

func (n noopStorePool) throttle(throttleReason, string, roachpb.StoreID) {}

// sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream.
func sendSnapshot(
ctx context.Context,
Expand Down
Loading

0 comments on commit c3c4c66

Please sign in to comment.