Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: communicate snapshot index back along with snapshot response #106793

Merged
merged 6 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -166,7 +165,6 @@ func usesAtomicReplicationChange(ops []kvpb.ReplicationChange) bool {

func TestAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 84242, "flaky test")
defer log.Scope(t).Close(t)

ctx := context.Background()
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ message SnapshotResponse {
//
// MIGRATION: only guaranteed to be set when the message field is no longer there.
errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false];

// msg_app_resp stores an optional MsgAppResp the receiving RawNode may have
// generated in response to applying the snapshot. This message will also have
// been handed to the raft transport, but it is helpful to step it into the
// sender manually to avoid the race described in:
//
// https://github.com/cockroachdb/cockroach/issues/97971
raftpb.Message msg_app_resp = 6;
}

// TODO(baptist): Extend this if necessary to separate out the request for the throttle.
Expand Down Expand Up @@ -356,6 +364,14 @@ message DelegateSnapshotResponse {
// collected_spans stores trace spans recorded during the execution of this
// request.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false];

// msg_app_resp stores an optional MsgAppResp the receiving RawNode may have
// generated in response to applying the snapshot. This message will also have
// been handed to the raft transport, but it is helpful to step it into the
// sender manually to avoid the race described in:
//
// https://github.com/cockroachdb/cockroach/issues/97971
raftpb.Message msg_app_resp = 4;
}

// ConfChangeContext is encoded in the raftpb.ConfChange.Context field.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,10 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() {

// SendSnapshot streams the given outgoing snapshot. The caller is responsible
// for closing the OutgoingSnapshot.
//
// The optional (but usually present) returned message is an MsgAppResp that
// results from the follower applying the snapshot, acking the log at the index
// of the snapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
storePool *storepool.StorePool,
Expand All @@ -1107,17 +1111,17 @@ func (t *RaftTransport) SendSnapshot(
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
) (*kvserverpb.SnapshotResponse, error) {
nodeID := header.RaftMessageRequest.ToReplica.NodeID

conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
return nil, err
}
client := NewMultiRaftClient(conn)
stream, err := client.RaftSnapshot(ctx)
if err != nil {
return err
return nil, err
}

defer func() {
Expand All @@ -1132,18 +1136,18 @@ func (t *RaftTransport) SendSnapshot(
// and determines if it encountered any errors when sending the snapshot.
func (t *RaftTransport) DelegateSnapshot(
ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
) error {
) (*kvserverpb.DelegateSnapshotResponse, error) {
tbg marked this conversation as resolved.
Show resolved Hide resolved
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
client := NewMultiRaftClient(conn)

// Creates a rpc stream between the leaseholder and sender.
stream, err := client.DelegateRaftSnapshot(ctx)
if err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
defer func() {
if err := stream.CloseSend(); err != nil {
Expand All @@ -1154,12 +1158,12 @@ func (t *RaftTransport) DelegateSnapshot(
// Send the request.
wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}}
if err := stream.Send(wrappedRequest); err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
// Wait for response to see if the receiver successfully applied the snapshot.
resp, err := stream.Recv()
if err != nil {
return errors.Mark(
return nil, errors.Mark(
errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError,
)
}
Expand All @@ -1175,14 +1179,14 @@ func (t *RaftTransport) DelegateSnapshot(

switch resp.Status {
case kvserverpb.DelegateSnapshotResponse_ERROR:
return errors.Mark(
return nil, errors.Mark(
errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError)
case kvserverpb.DelegateSnapshotResponse_APPLIED:
// This is the response we're expecting. Snapshot successfully applied.
log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp)
return nil
return resp, nil
default:
return err
return nil, err
}
}

Expand Down
43 changes: 34 additions & 9 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2872,7 +2872,23 @@ func (r *Replica) sendSnapshotUsingDelegate(
retErr = timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
// Sending snapshot
return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
if err != nil {
return err
}
if resp.MsgAppResp != nil {
_ = r.withRaftGroup(func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
msg := *resp.MsgAppResp
// With a delegated snapshot, the recipient received the snapshot
// from another replica and will thus respond to it instead. But the
// message is valid for the actual originator of the send as well.
msg.To = rn.BasicStatus().ID
// We do want to unquiesce here - we wouldn't ever want state transitions
// on a quiesced replica.
return true, rn.Step(*resp.MsgAppResp)
})
}
return nil
},
)
if !selfDelegate {
Expand Down Expand Up @@ -3053,7 +3069,7 @@ func (r *Replica) followerSendSnapshot(
ctx context.Context,
recipient roachpb.ReplicaDescriptor,
req *kvserverpb.DelegateSendSnapshotRequest,
) error {
) (*raftpb.Message, error) {
ctx = r.AnnotateCtx(ctx)
sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV)
if sendThreshold > 0 {
Expand Down Expand Up @@ -3082,28 +3098,28 @@ func (r *Replica) followerSendSnapshot(
// expensive to send.
err := r.validateSnapshotDelegationRequest(ctx, req)
if err != nil {
return err
return nil, err
}

// Throttle snapshot sending. Obtain the send semaphore and determine the rate limit.
rangeSize := r.GetMVCCStats().Total()
cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize)
if err != nil {
return errors.Wrap(err, "Unable to reserve space for sending this snapshot")
return nil, errors.Wrap(err, "Unable to reserve space for sending this snapshot")
}
defer cleanup()

// Check validity again, it is possible that the pending request should not be
// sent after we are doing waiting.
err = r.validateSnapshotDelegationRequest(ctx, req)
if err != nil {
return err
return nil, err
}

snapType := req.Type
snap, err := r.GetSnapshot(ctx, snapType, req.SnapId)
if err != nil {
return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
}
defer snap.Close()
log.Event(ctx, "generated snapshot")
Expand Down Expand Up @@ -3174,9 +3190,10 @@ func (r *Replica) followerSendSnapshot(
}
}

return timeutil.RunWithTimeout(
var msgAppResp *raftpb.Message
if err := timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
resp, err := r.store.cfg.Transport.SendSnapshot(
ctx,
r.store.cfg.StorePool,
header,
Expand All @@ -3185,8 +3202,16 @@ func (r *Replica) followerSendSnapshot(
sent,
recordBytesSent,
)
if err != nil {
return err
}
msgAppResp = resp.MsgAppResp
return nil
},
)
); err != nil {
return nil, err
}
return msgAppResp, nil
}

// replicasCollocated is used in AdminMerge to ensure that the ranges are
Expand Down
49 changes: 49 additions & 0 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2441,3 +2441,52 @@ func TestRebalancingAndCrossRegionZoneSnapshotMetrics(t *testing.T) {
})

}

// TestAddVotersWithoutRaftQueue verifies that in normal operations Raft
// snapshots are not required. This test creates a range with a single voter,
// then adds two additional voters. Most of the time this succeeds, however it
// fails (today) occasionally due to the addition of the first voter being
// "incomplete" and therefore the second voter is not able to be added because
// there is no quorum.
//
// Specifically the following sequence of events happens when the leader adds
// the first voter:
// 1. AdminChangeReplicasRequest is processed on n1.
// a) Adds a n2 as a LEARNER to raft.
// b) Sends an initial snapshot to n2.
// c) n2 receives and applies the snapshot.
// d) n2 responds that it successfully applied the snapshot.
// e) n1 receives the response and updates state to Follower.
// 2. Before step c above, n1 sends a MsgApp to n2
// a) MsgApp - entries up-to and including the conf change.
// b) The MsgApp is received and REJECTED because the term is wrong.
// c) After 1e above, n1 receives the rejection.
// d) n1 updates n2 from StateReplicate to StateProbe and then StateSnapshot.
//
// From n2's perspective, it receives the MsgApp prior to the initial snapshot.
// This results in it responding with a rejected MsgApp. Later it receives the
// snapshot and correctly applies it. However, when n1 sees the rejected MsgApp,
// it moves n2 status to StateProbe and stops sending Raft updates to it as it
// plans to fix it with a Raft Snapshot. As the raft snapshot queue is disabled
// this never happens and the state is stuck as a non-Learner in StateProbe. At
// this point, the Raft group is wedged since it only has 1/2 nodes available
// for Raft consensus.
func TestAddVotersWithoutRaftQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

// Disable the raft snapshot queue to make sure we don't require a raft snapshot.
tc := testcluster.StartTestCluster(
t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{DisableRaftSnapshotQueue: true}},
},
ReplicationMode: base.ReplicationManual,
},
)
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, key, tc.Target(1))
tc.AddVotersOrFatal(t, key, tc.Target(2))
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil {
return stats, errors.Wrap(err, "while applying snapshot")
}
for _, msg := range msgStorageAppend.Responses {
// The caller would like to see the MsgAppResp that usually results from
// applying the snapshot synchronously, so fish it out.
if msg.To == uint64(inSnap.FromReplica.ReplicaID) &&
msg.Type == raftpb.MsgAppResp &&
!msg.Reject &&
msg.Index == snap.Metadata.Index {

inSnap.msgAppRespCh <- msg
break
}
}
stats.tSnapEnd = timeutil.Now()
stats.snap.applied = true

Expand Down Expand Up @@ -1827,9 +1839,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
// index it requested is now actually durable on the follower. Note also that
// the follower will generate an MsgAppResp reflecting the applied snapshot
// which typically moves the follower to StateReplicate when (if) received
// by the leader.
//
// See: https://github.com/cockroachdb/cockroach/issues/87581
// by the leader, which as of #106793 we do synchronously.
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ReportSnapshot(uint64(to), snapStatus)
return true, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ type IncomingSnapshot struct {
DataSize int64
snapType kvserverpb.SnapshotRequest_Type
placeholder *ReplicaPlaceholder
raftAppliedIndex kvpb.RaftIndex // logging only
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
}

func (s IncomingSnapshot) String() string {
Expand Down
25 changes: 22 additions & 3 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func (s *Store) HandleDelegatedSnapshot(
}

// Pass the request to the sender replica.
if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil {
msgAppResp, err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req)
if err != nil {
// If an error occurred during snapshot sending, send an error response.
return &kvserverpb.DelegateSnapshotResponse{
Status: kvserverpb.DelegateSnapshotResponse_ERROR,
Expand All @@ -195,6 +196,7 @@ func (s *Store) HandleDelegatedSnapshot(
return &kvserverpb.DelegateSnapshotResponse{
Status: kvserverpb.DelegateSnapshotResponse_APPLIED,
CollectedSpans: sp.GetConfiguredRecording(),
MsgAppResp: msgAppResp,
}
}

Expand Down Expand Up @@ -426,8 +428,9 @@ func (s *Store) processRaftRequestWithReplica(
// will have been removed.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot,
) *kvpb.Error {
return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
) (*raftpb.Message, *kvpb.Error) {
var msgAppResp *raftpb.Message
pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
ctx context.Context, r *Replica,
) (pErr *kvpb.Error) {
ctx = r.AnnotateCtx(ctx)
Expand Down Expand Up @@ -496,8 +499,24 @@ func (s *Store) processRaftSnapshotRequest(
log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index)
s.metrics.RangeSnapshotRecvUnusable.Inc(1)
}
// If the snapshot was applied and acked with an MsgAppResp, return that
// message up the stack. We're using msgAppRespCh as a shortcut to avoid
// plumbing return parameters through an additional few layers of raft
// handling.
//
// NB: in practice there's always an MsgAppResp here, but it is better not
// to rely on what is essentially discretionary raft behavior.
select {
case msg := <-inSnap.msgAppRespCh:
msgAppResp = &msg
default:
}
return nil
})
if pErr != nil {
return nil, pErr
}
return msgAppResp, nil
}

// HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per
Expand Down
Loading