From 8cbd6aed9ae2361e895a94341b07f72447ac31d8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 14 Jul 2023 11:40:42 +0100 Subject: [PATCH] kvserver: plumb MsgAppResp back to snapshot sender We want the initiator of a (potentially delegated) snapshot to be able to see the MsgAppResp that is generated on the recipient of the snapshot as a result of application. This commit does the plumbing, but the `*MsgAppResp` is always `nil`, i.e. no actual logic was added yet. --- pkg/kv/kvserver/raft_transport.go | 26 ++++++++++++--------- pkg/kv/kvserver/replica_command.go | 28 ++++++++++++++-------- pkg/kv/kvserver/store_raft.go | 13 ++++++++--- pkg/kv/kvserver/store_snapshot.go | 37 +++++++++++++++++------------- pkg/kv/kvserver/store_test.go | 9 ++++---- 5 files changed, 70 insertions(+), 43 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 7923d84323aa..0db457c010d8 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -1099,6 +1099,10 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() { // SendSnapshot streams the given outgoing snapshot. The caller is responsible // for closing the OutgoingSnapshot. +// +// The optional (but usually present) returned message is an MsgAppResp that +// results from the follower applying the snapshot, acking the log at the index +// of the snapshot. func (t *RaftTransport) SendSnapshot( ctx context.Context, storePool *storepool.StorePool, @@ -1107,17 +1111,17 @@ func (t *RaftTransport) SendSnapshot( newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, -) error { +) (*kvserverpb.SnapshotResponse, error) { nodeID := header.RaftMessageRequest.ToReplica.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return err + return nil, err } client := NewMultiRaftClient(conn) stream, err := client.RaftSnapshot(ctx) if err != nil { - return err + return nil, err } defer func() { @@ -1132,18 +1136,18 @@ func (t *RaftTransport) SendSnapshot( // and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, -) error { +) (*kvserverpb.DelegateSnapshotResponse, error) { nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. stream, err := client.DelegateRaftSnapshot(ctx) if err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } defer func() { if err := stream.CloseSend(); err != nil { @@ -1154,12 +1158,12 @@ func (t *RaftTransport) DelegateSnapshot( // Send the request. wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}} if err := stream.Send(wrappedRequest); err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } // Wait for response to see if the receiver successfully applied the snapshot. resp, err := stream.Recv() if err != nil { - return errors.Mark( + return nil, errors.Mark( errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError, ) } @@ -1175,14 +1179,14 @@ func (t *RaftTransport) DelegateSnapshot( switch resp.Status { case kvserverpb.DelegateSnapshotResponse_ERROR: - return errors.Mark( + return nil, errors.Mark( errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError) case kvserverpb.DelegateSnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp) - return nil + return resp, nil default: - return err + return nil, err } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c7fd8ccb2fde..b163fd210aa4 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2872,7 +2872,8 @@ func (r *Replica) sendSnapshotUsingDelegate( retErr = timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { // Sending snapshot - return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + _, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + return err }, ) if !selfDelegate { @@ -3053,7 +3054,7 @@ func (r *Replica) followerSendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, req *kvserverpb.DelegateSendSnapshotRequest, -) error { +) (*raftpb.Message, error) { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -3082,14 +3083,14 @@ func (r *Replica) followerSendSnapshot( // expensive to send. err := r.validateSnapshotDelegationRequest(ctx, req) if err != nil { - return err + return nil, err } // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return errors.Wrap(err, "Unable to reserve space for sending this snapshot") + return nil, errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() @@ -3097,13 +3098,13 @@ func (r *Replica) followerSendSnapshot( // sent after we are doing waiting. err = r.validateSnapshotDelegationRequest(ctx, req) if err != nil { - return err + return nil, err } snapType := req.Type snap, err := r.GetSnapshot(ctx, snapType, req.SnapId) if err != nil { - return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) + return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") @@ -3174,9 +3175,10 @@ func (r *Replica) followerSendSnapshot( } } - return timeutil.RunWithTimeout( + var msgAppResp *raftpb.Message + if err := timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.SendSnapshot( + resp, err := r.store.cfg.Transport.SendSnapshot( ctx, r.store.cfg.StorePool, header, @@ -3185,8 +3187,16 @@ func (r *Replica) followerSendSnapshot( sent, recordBytesSent, ) + if err != nil { + return err + } + msgAppResp = resp.MsgAppResp + return nil }, - ) + ); err != nil { + return nil, err + } + return msgAppResp, nil } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 82d2065730c3..2deb6426ca0e 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -183,7 +183,8 @@ func (s *Store) HandleDelegatedSnapshot( } // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + msgAppResp, err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req) + if err != nil { // If an error occurred during snapshot sending, send an error response. return &kvserverpb.DelegateSnapshotResponse{ Status: kvserverpb.DelegateSnapshotResponse_ERROR, @@ -195,6 +196,7 @@ func (s *Store) HandleDelegatedSnapshot( return &kvserverpb.DelegateSnapshotResponse{ Status: kvserverpb.DelegateSnapshotResponse_APPLIED, CollectedSpans: sp.GetConfiguredRecording(), + MsgAppResp: msgAppResp, } } @@ -426,8 +428,9 @@ func (s *Store) processRaftRequestWithReplica( // will have been removed. func (s *Store) processRaftSnapshotRequest( ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot, -) *kvpb.Error { - return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( +) (*raftpb.Message, *kvpb.Error) { + var msgAppResp *raftpb.Message + pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( ctx context.Context, r *Replica, ) (pErr *kvpb.Error) { ctx = r.AnnotateCtx(ctx) @@ -498,6 +501,10 @@ func (s *Store) processRaftSnapshotRequest( } return nil }) + if pErr != nil { + return nil, pErr + } + return msgAppResp, nil } // HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 930d686f1b78..5ed684fe3ff4 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -1140,7 +1140,8 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + msgAppResp, pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap) + if pErr != nil { err := pErr.GoError() // We mark this error as a snapshot error which will be interpreted by the // sender as this being a retriable error, see isSnapshotError(). @@ -1151,6 +1152,7 @@ func (s *Store) receiveSnapshot( return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(), + MsgAppResp: msgAppResp, }) } @@ -1482,7 +1484,7 @@ func SendEmptySnapshot( } }() - return sendSnapshot( + if _, err := sendSnapshot( ctx, st, tracer, @@ -1493,7 +1495,10 @@ func SendEmptySnapshot( eng.NewWriteBatch, func() {}, nil, /* recordBytesSent */ - ) + ); err != nil { + return err + } + return nil } // noopStorePool is a hollowed out StorePool that does not throttle. It's used in recovery scenarios. @@ -1513,7 +1518,7 @@ func sendSnapshot( newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, -) error { +) (*kvserverpb.SnapshotResponse, error) { if recordBytesSent == nil { // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't @@ -1526,7 +1531,7 @@ func sendSnapshot( start := timeutil.Now() to := header.RaftMessageRequest.ToReplica if err := stream.Send(&kvserverpb.SnapshotRequest{Header: &header}); err != nil { - return err + return nil, err } log.Event(ctx, "sent SNAPSHOT_REQUEST message to server") // Wait until we get a response from the server. The recipient may queue us @@ -1536,13 +1541,13 @@ func sendSnapshot( resp, err := stream.Recv() if err != nil { storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID) - return err + return nil, err } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: sp.ImportRemoteRecording(resp.CollectedSpans) storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID) - return errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) + return nil, errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server") @@ -1550,7 +1555,7 @@ func sendSnapshot( err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s", to, snap, resp.Status) storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID) - return err + return nil, err } durQueued := timeutil.Since(start) @@ -1586,7 +1591,7 @@ func sendSnapshot( // Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled numBytesSent, err := ss.Send(ctx, stream, header, snap, recordBytesSent) if err != nil { - return err + return nil, err } durSent := timeutil.Since(start) @@ -1595,7 +1600,7 @@ func sendSnapshot( // applied. sent() if err := stream.Send(&kvserverpb.SnapshotRequest{Final: true}); err != nil { - return err + return nil, err } log.KvDistribution.Infof( ctx, @@ -1612,7 +1617,7 @@ func sendSnapshot( resp, err = stream.Recv() if err != nil { - return errors.Wrapf(err, "%s: remote failed to apply snapshot", to) + return nil, errors.Wrapf(err, "%s: remote failed to apply snapshot", to) } sp.ImportRemoteRecording(resp.CollectedSpans) // NB: wait for EOF which ensures that all processing on the server side has @@ -1620,19 +1625,19 @@ func sendSnapshot( // received). if unexpectedResp, err := stream.Recv(); err != io.EOF { if err != nil { - return errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) + return nil, errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) } - return errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) + return nil, errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( + return nil, errors.Wrapf( maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote failed to apply snapshot", to, ) case kvserverpb.SnapshotResponse_APPLIED: - return nil + return resp, nil default: - return errors.Errorf("%s: server sent an invalid status during finalization: %s", + return nil, errors.Errorf("%s: server sent an invalid status during finalization: %s", to, resp.Status, ) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 18621ab9a4e3..0810d6c385ee 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3048,13 +3048,14 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { require.NoError(t, err) } - require.NoError(t, s.processRaftSnapshotRequest(ctx, req, + _, pErr := s.processRaftSnapshotRequest(ctx, req, IncomingSnapshot{ SnapUUID: uuid.MakeV4(), Desc: desc, placeholder: placeholder, }, - ).GoError()) + ) + require.NoError(t, pErr.GoError()) testutils.SucceedsSoon(t, func() error { s.mu.Lock() @@ -3127,7 +3128,7 @@ func TestSendSnapshotThrottling(t *testing.T) { sp := &fakeStorePool{} expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} - err := sendSnapshot( + _, err := sendSnapshot( ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { @@ -3146,7 +3147,7 @@ func TestSendSnapshotThrottling(t *testing.T) { EncodedError: errors.EncodeError(ctx, errors.New("boom")), } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot( + _, err := sendSnapshot( ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 {