Skip to content

Commit

Permalink
storage: allow healing ranges whose replicas are all lost
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
TheSamHuang committed Nov 12, 2020
1 parent af2fc41 commit 19ef42f
Show file tree
Hide file tree
Showing 6 changed files with 1,249 additions and 647 deletions.
28 changes: 28 additions & 0 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,34 @@ func (t *RaftTransport) startProcessNewQueue(
return true
}

func initAndSendSnapshot(
ctx context.Context,
stream MultiRaft_RaftSnapshotClient,
st *cluster.Settings,
storePool SnapshotStorePool,
header SnapshotRequest_Header,
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
sent func(),
) error {
defer func() {
if err := stream.CloseSend(); err != nil {
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()

return sendSnapshot(
ctx,
st,
stream,
storePool,
header,
snap,
newBatch,
sent,
)
}

// SendSnapshot streams the given outgoing snapshot. The caller is responsible
// for closing the OutgoingSnapshot.
func (t *RaftTransport) SendSnapshot(
Expand Down
123 changes: 123 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
enginepb "github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -31,6 +36,7 @@ import (
crdberrors "github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -904,6 +910,123 @@ func (e *errMustRetrySnapshotDueToTruncation) Error() string {
)
}

// Formerly HackSendSnapshot.
func SendEmptySnapshot(
ctx context.Context,
st *cluster.Settings,
cc *grpc.ClientConn,
now hlc.Timestamp,
desc roachpb.RangeDescriptor,
to roachpb.ReplicaDescriptor,
) error {
// Create engine to use as a buffer for the empty snapshot.
eng := storage.NewInMem(ctx, roachpb.Attributes{}, 1<<20 /* cacheSize */)
defer eng.Close()

var ms enginepb.MVCCStats
// Seed empty range into engine.
if err := storage.MVCCPutProto(
ctx, eng, &ms, keys.RangeDescriptorKey(desc.StartKey), now, nil /* txn */, &desc,
); err != nil {
return err
}
ms, err := stateloader.WriteInitialReplicaState(
ctx,
eng,
ms,
desc,
roachpb.Lease{},
hlc.Timestamp{}, // gcThreshold
stateloader.TruncatedStateUnreplicated,
)
if err != nil {
return err
}

// Use stateloader to load state out of memory from the previously created engine.
sl := stateloader.Make(desc.RangeID)
state, err := sl.Load(ctx, eng, &desc)
if err != nil {
return err
}
hs, err := sl.LoadHardState(ctx, eng)
if err != nil {
return err
}

snapUUID, err := uuid.NewV4()
if err != nil {
return err
}

outgoingSnap, err := snapshot(
ctx,
snapUUID,
sl,
// TODO (thesamhuang): We may want a separate SnapshotRequest type for recovery that always goes through by bypassing all throttling so they cannot be declined.
// We don't want our operation to be held up behind a long running snapshot. We want this to go through quickly.
SnapshotRequest_RAFT,
eng,
desc.RangeID,
raftentry.NewCache(1), // Cache is not used.
func(func(SideloadStorage) error) error { return nil }, // This is used for sstables, not needed here as there are no logs.
desc.StartKey,
)
defer outgoingSnap.Close()
if err != nil {
return err
}

// From and to ReplicaDescriptors are the same because we have
// to send the snapshot from a member of the RangeDescriptor.
// Sending it from the current replica ensures that. Otherwise,
// it would be a malformed request if it came from a non-member.
from := to
req := RaftMessageRequest{
RangeID: desc.RangeID,
FromReplica: from,
ToReplica: to,
Message: raftpb.Message{
Type: raftpb.MsgSnap,
To: uint64(to.ReplicaID),
From: uint64(from.ReplicaID),
Term: hs.Term,
Snapshot: outgoingSnap.RaftSnap,
},
}

header := SnapshotRequest_Header{
State: state,
RaftMessageRequest: req,
RangeSize: ms.Total(),
CanDecline: false,
Priority: SnapshotRequest_RECOVERY,
Strategy: SnapshotRequest_KV_BATCH,
Type: SnapshotRequest_RAFT,
UnreplicatedTruncatedState: true,
}

stream, err := NewMultiRaftClient(cc).RaftSnapshot(ctx)
if err != nil {
return err
}

return initAndSendSnapshot(
ctx,
stream,
st,
noopStorePool{},
header,
&outgoingSnap,
eng.NewBatch,
func() {},
)
}

type noopStorePool struct{}

func (n noopStorePool) throttle(throttleReason, string, roachpb.StoreID) {}

// sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream.
func sendSnapshot(
ctx context.Context,
Expand Down
Loading

0 comments on commit 19ef42f

Please sign in to comment.