Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: allow healing ranges whose replicas are all lost #56333

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (m *mockServer) GossipSubscription(
return m.gossipSubFn(req, stream)
}

func (*mockServer) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

func (*mockServer) Batch(context.Context, *roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
panic("unimplemented")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func (n Node) Join(context.Context, *roachpb.JoinNodeRequest) (*roachpb.JoinNode
panic("unimplemented")
}

func (n Node) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ type mockInternalClient struct {

var _ roachpb.InternalClient = &mockInternalClient{}

func (*mockInternalClient) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest, ...grpc.CallOption,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

// Batch is part of the roachpb.InternalClient interface.
func (m *mockInternalClient) Batch(
ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_test(
"replica_test.go",
"replicate_queue_test.go",
"replicate_test.go",
"reset_quorum_test.go",
"scanner_test.go",
"scheduler_test.go",
"single_key_test.go",
Expand Down
156 changes: 156 additions & 0 deletions pkg/kv/kvserver/reset_quorum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestResetQuorum tests the ResetQuorum method to restore quorum for a
// range whose replicas are all lost. It starts a cluster with two nodes,
// n1 and n2, with a range isolated to n2. Then, it stops n2 and checks
// that the range is unavailable. Finally, it uses ResetQuorum to make the
// range accessible again and checks that the range is available again.
//
// TODO(thesamhuang): Add additional testing to cover two cases:
// 1. if there's an existing replica and we send the recovery snapshot to
// that replica, we still get the quorum reset and end up with an empty
// range.
// 2. if there's an existing replica and we send the recovery snapshot to
// another node that doesn't have that replica, we also still get the
// reset, and replicaGC removes the original survivor when triggered.
func TestResetQuorum(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

livenessDuration := 3000 * time.Millisecond

clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
LivenessDuration: livenessDuration, // set duration so n2 liveness expires shortly after stopping
RenewalDuration: 1500 * time.Millisecond,
},
},
},
}
tc := testcluster.StartTestCluster(t, 2, clusterArgs)
defer tc.Stopper().Stop(ctx)

n1, n2 := 0, 1

// Set up a scratch range isolated to n2.
k := tc.ScratchRange(t)
desc, err := tc.AddVoters(k, tc.Target(n2))
require.NoError(t, err)
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(n2)))
desc, err = tc.RemoveVoters(k, tc.Target(n1))
require.NoError(t, err)
require.Len(t, desc.Replicas().All(), 1)

srv := tc.Server(n1)

require.NoError(t, srv.DB().Put(ctx, k, "bar"))

metaKey := keys.RangeMetaKey(desc.EndKey).AsRawKey()
// Read the meta2 which ResetQuorum will be updating while n2
// is still available. This avoids an intent from the previous
// replication change to linger; such an intent would be anchored
// on the scratch range, which is unavailable when n2 goes down.
_, err = srv.DB().Get(ctx, metaKey)
require.NoError(t, err)
tc.StopServer(n2)
// Wait for n2 liveness to expire.
time.Sleep(livenessDuration)

// Sanity check that requests to the ScratchRange fail.
func() {
cCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel()
err := srv.DB().Put(cCtx, k, "baz")
// NB: we don't assert on the exact error since our RPC layer
// tries to return a better error than DeadlineExceeded (at
// the time of writing, we get a connection failure to n2),
// and we don't wait for the context to even time out.
// We're probably checking in the RPC layer whether we've
// retried with an up-to-date desc and fail fast if so.
require.Error(t, err)
}()

// Get the store on the designated survivor n1.
var store *kvserver.Store
require.NoError(t, srv.GetStores().(*kvserver.Stores).VisitStores(func(inner *kvserver.Store) error {
store = inner
return nil
}))
if store == nil {
t.Fatal("no store found on n1")
}

// Call ResetQuorum to reset quorum on the unhealthy range.
t.Logf("resetting quorum on node id: %v, store id: %v", store.NodeID(), store.StoreID())
_, err = srv.Node().(*server.Node).ResetQuorum(
ctx,
&roachpb.ResetQuorumRequest{
RangeID: int32(desc.RangeID),
},
)
require.NoError(t, err)

t.Logf("resetting quorum complete")

require.NoError(t, srv.DB().Put(ctx, k, "baz"))

// Get range descriptor from meta2 and verify it was updated correctly.
var updatedDesc roachpb.RangeDescriptor
require.NoError(t, store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
kvs, err := sql.ScanMetaKVs(ctx, txn, roachpb.Span{
Key: roachpb.KeyMin,
EndKey: roachpb.KeyMax,
})
if err != nil {
return err
}

for i := range kvs {
if err := kvs[i].Value.GetProto(&updatedDesc); err != nil {
return err
}
if updatedDesc.RangeID == desc.RangeID {
return nil
}
}
return errors.Errorf("range id %v not found after resetting quorum", desc.RangeID)
}))
if len(updatedDesc.Replicas().All()) != 1 {
t.Fatalf("found %v replicas found after resetting quorum, expected 1", len(updatedDesc.Replicas().All()))
}
if updatedDesc.Replicas().All()[0].NodeID != srv.NodeID() {
t.Fatalf("replica found after resetting quorum is on node id %v, expected node id %v", updatedDesc.Replicas().All()[0].NodeID, srv.NodeID())
}

}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,30 @@ func (s *Store) processRaftSnapshotRequest(
}
}()
}

if snapHeader.RaftMessageRequest.Message.From == snapHeader.RaftMessageRequest.Message.To {
// This is a special case exercised during recovery from loss of quorum.
// In this case, a forged snapshot will be sent to the replica and will
// hit this code path (if we make up a non-existent follower, Raft will
// drop the message, hence we are forced to make the receiver the sender).
//
// Unfortunately, at the time of writing, Raft assumes that a snapshot
// is always received from the leader (of the given term), which plays
// poorly with these forged snapshots. However, a zero sender works just
// fine as the value zero represents "no known leader".
//
// We prefer not to introduce a zero origin of the message as throughout
// our code we rely on it being present. Instead, we reset the origin
// that raft looks at just before handing the message off.
snapHeader.RaftMessageRequest.Message.From = 0
}
// NB: we cannot get errRemoved here because we're promised by
// withReplicaForRequest that this replica is not currently being removed
// and we've been holding the raftMu the entire time.
if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil {
return roachpb.NewError(err)
}

_, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap)
maybeFatalOnRaftReadyErr(ctx, expl, err)
removePlaceholder = false
Expand Down
136 changes: 136 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
enginepb "github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -30,6 +35,7 @@ import (
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -898,6 +904,136 @@ func (e *errMustRetrySnapshotDueToTruncation) Error() string {
)
}

// SendEmptySnapshot creates an OutgoingSnapshot for the input range
// descriptor and seeds it with an empty range. Then, it sends this
// snapshot to the replica specified in the input.
func SendEmptySnapshot(
ctx context.Context,
st *cluster.Settings,
cc *grpc.ClientConn,
now hlc.Timestamp,
desc roachpb.RangeDescriptor,
to roachpb.ReplicaDescriptor,
) error {
// Create an engine to use as a buffer for the empty snapshot.
eng := storage.NewDefaultInMem()
defer eng.Close()

var ms enginepb.MVCCStats
// Seed an empty range into the new engine.
if err := storage.MVCCPutProto(
ctx, eng, &ms, keys.RangeDescriptorKey(desc.StartKey), now, nil /* txn */, &desc,
); err != nil {
return err
}
ms, err := stateloader.WriteInitialReplicaState(
ctx,
eng,
ms,
desc,
roachpb.Lease{},
hlc.Timestamp{}, // gcThreshold
stateloader.TruncatedStateUnreplicated,
)
if err != nil {
return err
}

// Use stateloader to load state out of memory from the previously created engine.
sl := stateloader.Make(desc.RangeID)
state, err := sl.Load(ctx, eng, &desc)
if err != nil {
return err
}
hs, err := sl.LoadHardState(ctx, eng)
if err != nil {
return err
}

snapUUID, err := uuid.NewV4()
if err != nil {
return err
}

// Create an OutgoingSnapshot to send.
outgoingSnap, err := snapshot(
ctx,
snapUUID,
sl,
// TODO(tbg): We may want a separate SnapshotRequest type
// for recovery that always goes through by bypassing all throttling
// so they cannot be declined. We don't want our operation to be held
// up behind a long running snapshot. We want this to go through
// quickly.
SnapshotRequest_VIA_SNAPSHOT_QUEUE,
eng,
desc.RangeID,
raftentry.NewCache(1), // cache is not used
func(func(SideloadStorage) error) error { return nil }, // this is used for sstables, not needed here as there are no logs
desc.StartKey,
)
if err != nil {
return err
}
defer outgoingSnap.Close()

// From and to replica descriptors are the same because we have
// to send the snapshot from a member of the range descriptor.
// Sending it from the current replica ensures that. Otherwise,
// it would be a malformed request if it came from a non-member.
from := to
req := RaftMessageRequest{
RangeID: desc.RangeID,
FromReplica: from,
ToReplica: to,
Message: raftpb.Message{
Type: raftpb.MsgSnap,
To: uint64(to.ReplicaID),
From: uint64(from.ReplicaID),
Term: hs.Term,
Snapshot: outgoingSnap.RaftSnap,
},
}

header := SnapshotRequest_Header{
State: state,
RaftMessageRequest: req,
RangeSize: ms.Total(),
CanDecline: false,
Priority: SnapshotRequest_RECOVERY,
Strategy: SnapshotRequest_KV_BATCH,
Type: SnapshotRequest_VIA_SNAPSHOT_QUEUE,
UnreplicatedTruncatedState: true,
}

stream, err := NewMultiRaftClient(cc).RaftSnapshot(ctx)
if err != nil {
return err
}

defer func() {
if err := stream.CloseSend(); err != nil {
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()

return sendSnapshot(
ctx,
st,
stream,
noopStorePool{},
header,
&outgoingSnap,
eng.NewBatch,
func() {},
)
}

// noopStorePool is a hollowed out StorePool that does not throttle. It's used in recovery scenarios.
type noopStorePool struct{}
TheSamHuang marked this conversation as resolved.
Show resolved Hide resolved

func (n noopStorePool) throttle(throttleReason, string, roachpb.StoreID) {}

// sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream.
func sendSnapshot(
ctx context.Context,
Expand Down
Loading