From 83942c8dc27733c7342ce009803a80a54c498e05 Mon Sep 17 00:00:00 2001 From: Amy Gao Date: Mon, 28 Feb 2022 22:01:53 -0500 Subject: [PATCH] kvserver: self delegated snapshots This commit adds a new rpc stream for sending raft message requests between replicas which allows for delegating snapshots. Currently this patch implements the leaseholder delegating to itself, but in future patches the leaseholder will be able to delegate snapshot sending to follower replicas. A new message request type of `DelegatedSnapshotRequest` includes a `SnapshotRequest` and the replica descriptor of the new sender replica. This allows the leaseholder to fill in some snapshot metadata before delegating to the new sender store to generate the snapshot and transmit it to the recipient. Fixes: #42491 Release note: None Release justification: --- pkg/kv/kvserver/client_raft_helpers_test.go | 12 ++ pkg/kv/kvserver/client_raft_test.go | 14 ++- pkg/kv/kvserver/kvserverpb/raft.proto | 5 + pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/raft_transport.go | 115 +++++++++++++++++- pkg/kv/kvserver/raft_transport_test.go | 8 ++ pkg/kv/kvserver/replica_command.go | 123 ++++++++++++++------ pkg/kv/kvserver/replica_learner_test.go | 69 +++++++++++ pkg/kv/kvserver/storage_services.proto | 1 + pkg/kv/kvserver/store.go | 4 +- pkg/kv/kvserver/store_raft.go | 102 +++++++++++++++- pkg/kv/kvserver/store_snapshot.go | 92 ++++++++++++++- pkg/kv/kvserver/testing_knobs.go | 2 + 13 files changed, 505 insertions(+), 44 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 33000a87dcea..e52ef89fa6f1 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -180,6 +180,18 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( return store.HandleSnapshot(ctx, header, respStream) } +func (h *testClusterStoreRaftMessageHandler) SendDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegatedSnapshotRequest, + respStream kvserver.DelegateSnapshotResponseStream, +) error { + store, err := h.getStore() + if err != nil { + return err + } + return store.SendDelegatedSnapshot(ctx, req, respStream) +} + // testClusterPartitionedRange is a convenient abstraction to create a range on a node // in a multiTestContext which can be partitioned and unpartitioned. type testClusterPartitionedRange struct { diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 757e576f98b6..013f7f9380bf 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3452,6 +3452,14 @@ func (errorChannelTestHandler) HandleSnapshot( panic("unimplemented") } +func (errorChannelTestHandler) SendDelegatedSnapshot( + _ context.Context, + req *kvserverpb.DelegatedSnapshotRequest, + respStream kvserver.DelegateSnapshotResponseStream, +) error { + panic("unimplemented") +} + // This test simulates a scenario where one replica has been removed from the // range's Raft group but it is unaware of the fact. We check that this replica // coming back from the dead cannot cause elections. @@ -3460,10 +3468,12 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc := testcluster.StartTestCluster(t, 4, + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, - }) + }, + ) defer tc.Stopper().Stop(ctx) // Move the first range from the first node to the other three. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 2fa84622e03f..12c990e72eed 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -201,6 +201,11 @@ message SnapshotRequest { reserved 3; } +message DelegatedSnapshotRequest { + SnapshotRequest snapRequest = 1; + roachpb.ReplicaDescriptor delegated_sender = 2 [(gogoproto.nullable) = false]; +} + message SnapshotResponse { enum Status { UNKNOWN = 0; diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 82dc076a0098..1bc34473b2bd 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -147,7 +147,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) + err := repl.sendDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index af4a306cfaea..3584cf8c9991 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -104,6 +104,13 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } +// DelegateSnapshotResponseStream is the subset of the +// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses. +type DelegateSnapshotResponseStream interface { + Send(request *kvserverpb.SnapshotResponse) error + Recv() (*kvserverpb.DelegatedSnapshotRequest, error) +} + // RaftMessageHandler is the interface that must be implemented by // arguments to RaftTransport.Listen. type RaftMessageHandler interface { @@ -112,7 +119,8 @@ type RaftMessageHandler interface { // If an error is encountered during asynchronous processing, it will be // streamed back to the sender of the message as a RaftMessageResponse. HandleRaftRequest(ctx context.Context, req *kvserverpb.RaftMessageRequest, - respStream RaftMessageResponseStream) *roachpb.Error + respStream RaftMessageResponseStream, + ) *roachpb.Error // HandleRaftResponse is called for each raft response. Note that // not all messages receive a response. An error is returned if and only if @@ -122,6 +130,13 @@ type RaftMessageHandler interface { // HandleSnapshot is called for each new incoming snapshot stream, after // parsing the initial SnapshotRequest_Header on the stream. HandleSnapshot(ctx context.Context, header *kvserverpb.SnapshotRequest_Header, respStream SnapshotResponseStream) error + + // SendDelegatedSnapshot is called for each incoming delegated snapshot + // request. + SendDelegatedSnapshot( + ctx context.Context, req *kvserverpb.DelegatedSnapshotRequest, + respStream DelegateSnapshotResponseStream, + ) error } type raftTransportStats struct { @@ -406,6 +421,78 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer } } +// DelegateRaftSnapshot handles incoming delegated snapshot requests and parses +// the request to pass off to the new sender store. +func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error { + errCh := make(chan error, 1) + taskCtx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context()) + defer cancel() + if err := t.stopper.RunAsyncTaskEx( + taskCtx, + stop.TaskOpts{ + TaskName: "storage.RaftTransport: processing snapshot", + SpanOpt: stop.ChildSpan, + }, func(ctx context.Context) { + errCh <- func() error { + req, err := stream.Recv() + if err != nil { + return err + } + // check to ensure the header is valid. + if req.SnapRequest.Header == nil { + return stream.Send( + &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, + Message: "client error: no header in first snapshot request message", + }, + ) + } + // get the handler of the new sender store. + handler, ok := t.getHandler(req.DelegatedSender.StoreID) + if !ok { + log.Warningf( + ctx, "unable to accept Raft message from leaseholder: %+v: no handler registered for"+ + " new sender store"+ + " %+v", + req.SnapRequest.Header.RaftMessageRequest.FromReplica.StoreID, + req.DelegatedSender.StoreID, + ) + return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID) + } + // acknowledge to the leaseholder that the request has been accepted. + if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { + return err + } + // pass off the request to the new sender store. + err = handler.SendDelegatedSnapshot(ctx, req, stream) + if err != nil { + log.Infof(ctx, "error: %v", err) + return stream.Send( + &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, + Message: err.Error(), + }, + ) + } + return stream.Send( + &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_APPLIED, + Message: "accepted!", + }, + ) + }() + }, + ); err != nil { + return err + } + select { + case <-t.stopper.ShouldQuiesce(): + return nil + case err := <-errCh: + return err + } +} + // RaftSnapshot handles incoming streaming snapshot requests. func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error { errCh := make(chan error, 1) @@ -705,7 +792,33 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %+v", err) } }() + return sendSnapshot( ctx, t.st, stream, storePool, header, snap, newBatch, sent, ) } + +// SendDelegatedSnapshot creates a rpc stream between the leaseholder and the +// new sender for delegated snapshot requests. +func (t *RaftTransport) SendDelegatedSnapshot( + ctx context.Context, storePool *StorePool, req *kvserverpb.DelegatedSnapshotRequest, +) error { + nodeID := req.DelegatedSender.NodeID + conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) + if err != nil { + return err + } + client := NewMultiRaftClient(conn) + + // creates rpc stream between leaseholder and the new sender. + stream, err := client.DelegateRaftSnapshot(ctx) + if err != nil { + return err + } + defer func() { + if err := stream.CloseSend(); err != nil { + log.Warningf(ctx, "failed to close snapshot stream: %+v", err) + } + }() + return sendDelegatedSnapshot(ctx, stream, storePool, req) +} diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index 6b6ca7c440a7..cc1edfdf1d76 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -96,6 +96,14 @@ func (s channelServer) HandleSnapshot( panic("unexpected HandleSnapshot") } +func (s channelServer) SendDelegatedSnapshot( + _ context.Context, + header *kvserverpb.DelegatedSnapshotRequest, + stream kvserver.DelegateSnapshotResponseStream, +) error { + panic("unimplemented") +} + // raftTransportTestContext contains objects needed to test RaftTransport. // Typical usage will add multiple nodes with AddNode, attach channels // to at least one store with ListenStore, and send messages with Send. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 0d8e4280783f..389d362e2553 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1788,7 +1788,11 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil { + + if err := r.sendDelegate( + ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, + priority, + ); err != nil { return nil, err } } @@ -2415,6 +2419,77 @@ func recordRangeEventsInLog( return nil } +// getSenderReplica returns a replica descriptor for a voter replica to act as +// the new sender. +func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) { + return r.GetReplicaDescriptor() +} + +func (r *Replica) sendDelegate( + ctx context.Context, + recipient roachpb.ReplicaDescriptor, + snapType kvserverpb.SnapshotRequest_Type, + priority kvserverpb.SnapshotRequest_Priority, +) (retErr error) { + defer func() { + // Report the snapshot status to Raft, which expects us to do this once we + // finish sending the snapshot. + r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr) + }() + // choose the new sender replica + sender, err := r.getSenderReplica(ctx) + if err != nil { + return err + } + desc, err := r.GetReplicaDescriptor() + if err != nil { + return err + } + status := r.RaftStatus() + if status == nil { + // This code path is sometimes hit during scatter for replicas that + // haven't woken up yet. + return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + } + + // create new snapshot request header with only metadata + header := &kvserverpb.SnapshotRequest_Header{ + DeprecatedUnreplicatedTruncatedState: true, + RaftMessageRequest: kvserverpb.RaftMessageRequest{ + RangeID: r.RangeID, + FromReplica: desc, + ToReplica: recipient, + Message: raftpb.Message{ + Type: raftpb.MsgSnap, + To: uint64(recipient.ReplicaID), + From: uint64(r.replicaID), + }, + }, + RangeSize: r.GetMVCCStats().Total(), + Priority: priority, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, + Type: snapType, + } + snapshotRequest := &kvserverpb.SnapshotRequest{Header: header} + delegatedRequest := &kvserverpb.DelegatedSnapshotRequest{ + SnapRequest: snapshotRequest, DelegatedSender: sender, + } + err = contextutil.RunWithTimeout( + ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { + return r.store.cfg.Transport.SendDelegatedSnapshot( + ctx, + r.store.allocator.storePool, + delegatedRequest, + ) + }, + ) + + if err != nil { + return errors.Mark(err, errMarkSnapshotError) + } + return nil +} + // sendSnapshot sends a snapshot of the replica state to the specified replica. // Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful // about adding additional calls as generating a snapshot is moderately @@ -2529,15 +2604,17 @@ func recordRangeEventsInLog( func (r *Replica) sendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, - snapType kvserverpb.SnapshotRequest_Type, - priority kvserverpb.SnapshotRequest_Priority, + req *kvserverpb.DelegatedSnapshotRequest, ) (retErr error) { + // TODO(amy): Do I need to report Raft Status here? I left it to the leaseholder since it should + // be propagated up. defer func() { // Report the snapshot status to Raft, which expects us to do this once we // finish sending the snapshot. r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr) }() + snapType := req.SnapRequest.Header.Type snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID) if err != nil { err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) @@ -2552,16 +2629,12 @@ func (r *Replica) sendSnapshot( // the leaseholder and we haven't yet applied the configuration change that's // adding the recipient to the range. if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - return errors.Wrapf(errMarkSnapshotError, + return errors.Wrapf( + errMarkSnapshotError, "attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc) - } - - sender, err := r.GetReplicaDescriptor() - if err != nil { - return errors.Wrapf(err, "%s: change replicas failed", r) + "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc, + ) } - status := r.RaftStatus() if status == nil { // This code path is sometimes hit during scatter for replicas that @@ -2583,44 +2656,28 @@ func (r *Replica) sendSnapshot( // See comment on DeprecatedUsingAppliedStateKey for why we need to set this // explicitly for snapshots going out to followers. snap.State.DeprecatedUsingAppliedStateKey = true + req.SnapRequest.Header.State = snap.State + req.SnapRequest.Header.RaftMessageRequest.Message.Snapshot = snap.RaftSnap - req := kvserverpb.SnapshotRequest_Header{ - State: snap.State, - DeprecatedUnreplicatedTruncatedState: true, - RaftMessageRequest: kvserverpb.RaftMessageRequest{ - RangeID: r.RangeID, - FromReplica: sender, - ToReplica: recipient, - Message: raftpb.Message{ - Type: raftpb.MsgSnap, - To: uint64(recipient.ReplicaID), - From: uint64(sender.ReplicaID), - Term: status.Term, - Snapshot: snap.RaftSnap, - }, - }, - RangeSize: r.GetMVCCStats().Total(), - Priority: priority, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: snapType, - } newBatchFn := func() storage.Batch { return r.store.Engine().NewUnindexedBatch(true /* writeOnly */) } sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + err = contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( ctx, r.store.allocator.storePool, - req, + *req.SnapRequest.Header, snap, newBatchFn, sent, ) - }) + }, + ) if err != nil { if errors.Is(err, errMalformedSnapshot) { tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 3edeb38dae50..697643a93085 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -138,6 +138,75 @@ func getFirstStoreMetric(t *testing.T, s serverutils.TestServerInterface, name s return c } +func TestAddReplicaViaDelegateSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + shouldDelegateSnapshots := int64(1) + tc := testcluster.StartTestCluster( + t, 5, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DelegateSnapshots: func() bool { + t.Helper() + if a := atomic.LoadInt64(&shouldDelegateSnapshots); a == 0 { + // TODO(amy): make this return the replica id of new sender + return false + } + return true + }, + }, + }, + }, + ReplicationMode: base.ReplicationManual, + }, + ) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + scratch := tc.ScratchRange(t) + + // Add a follower replica that will actually send the delegated snapshot. + tc.AddVotersOrFatal(t, scratch, tc.Target(1)) + + desc := tc.LookupRangeOrFatal(t, scratch) + require.Len(t, desc.Replicas().VoterDescriptors(), 2) + t.Logf("DEBUG: new sender added %v:", desc.Replicas().VoterDescriptors()) + // Try to add a voter to node 2, and the new follower should send the snapshot + + _, err := tc.AddVoters(scratch, tc.Target(2)) + if err != nil { + return + } + + _, err = tc.AddVoters(scratch, tc.Target(3)) + if err != nil { + return + } + desc = tc.LookupRangeOrFatal(t, scratch) + require.Len(t, desc.Replicas().VoterDescriptors(), 4) + //nodes := 4 + //errCh := make(chan error, nodes) + //var wg sync.WaitGroup + //for i := 2; i < nodes; i++ { + // wg.Add(1) + // go func(i int, wg *sync.WaitGroup) { + // defer wg.Done() + // _, err := tc.AddVoters(scratch, tc.Target(i)) + // if err != nil { + // return + // } + // errCh <- err + // }(i, &wg) + //} + //wg.Wait() + //desc = tc.LookupRangeOrFatal(t, scratch) + //require.Len(t, desc.Replicas().VoterDescriptors(), nodes) + //t.Logf("DEBUG: new voter successfully added %v:", desc.Replicas().Descriptors()) + // expect that the delegated follower sends the snapshot to the new voterStores + +} + func TestAddReplicaViaLearner(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index 61b762ebce25..577af4363ce6 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -19,6 +19,7 @@ import "gogoproto/gogo.proto"; service MultiRaft { rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {} rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} + rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegatedSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} } service PerReplica { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 85a3739cedf7..914051c5ad87 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -764,6 +764,8 @@ type Store struct { // Semaphore to limit concurrent non-empty snapshot application. snapshotApplySem chan struct{} + // Semaphore to limit concurrent non-empty snapshot sending. + snapshotSendSem chan struct{} // Track newly-acquired expiration-based leases that we want to proactively // renew. An object is sent on the signal whenever a new entry is added to @@ -1198,7 +1200,7 @@ func NewStore( s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval) s.metrics.registry.AddMetricStruct(s.txnWaitMetrics) s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) - + s.snapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) if ch := s.cfg.TestingKnobs.LeaseRenewalSignalChan; ch != nil { s.renewableLeasesSignal = ch } else { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 6ff0c8043cd8..b14b2548aa0c 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -63,6 +63,99 @@ func (q *raftRequestQueue) recycle(processed []raftRequestInfo) { } } +// reserveSendSnapshot throttles outgoing snapshots. The returned closure is used +// to cleanup the reservation and release its resources. +func (s *Store) reserveSendSnapshot( + ctx context.Context, req *kvserverpb.DelegatedSnapshotRequest, +) (_cleanup func(), _err error) { + header := req.SnapRequest.Header + tBegin := timeutil.Now() + // Empty snapshots are exempt from rate limits because they're so cheap to + // apply. This vastly speeds up rebalancing any empty ranges created by a + // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from + // getting stuck behind large snapshots managed by the replicate queue. + + if header.RangeSize != 0 { + queueCtx := ctx + if deadline, ok := queueCtx.Deadline(); ok { + // Enforce a more strict timeout for acquiring the snapshot reservation to + // ensure that if the reservation is acquired, the snapshot has sufficient + // time to complete. See the comment on snapshotReservationQueueTimeoutFraction + // and TestReserveSnapshotQueueTimeout. + timeoutFrac := snapshotReservationQueueTimeoutFraction.Get(&s.ClusterSettings().SV) + timeout := time.Duration(timeoutFrac * float64(timeutil.Until(deadline))) + var cancel func() + queueCtx, cancel = context.WithTimeout(queueCtx, timeout) // nolint:context + defer cancel() + } + select { + case s.snapshotSendSem <- struct{}{}: + case <-queueCtx.Done(): + if err := ctx.Err(); err != nil { + return nil, errors.Wrap(err, "acquiring snapshot reservation") + } + return nil, errors.Wrapf( + queueCtx.Err(), + "giving up during snapshot reservation due to %q", + snapshotReservationQueueTimeoutFraction.Key(), + ) + case <-s.stopper.ShouldQuiesce(): + return nil, errors.Errorf("stopped") + } + } + + // The choice here is essentially arbitrary, but with a default range size of 128mb-512mb and the + // Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot. + // which is what we want to log. + const snapshotReservationWaitWarnThreshold = 32 * time.Second + if elapsed := timeutil.Since(tBegin); elapsed > snapshotReservationWaitWarnThreshold { + replDesc := req.DelegatedSender + log.Infof( + ctx, + "waited for %.1fs to acquire snapshot sending reservation for r%d/s%d", + elapsed.Seconds(), + req.SnapRequest.Header.RaftMessageRequest.RangeID, + replDesc.StoreID, + ) + } + + return func() { + if header.RangeSize != 0 { + <-s.snapshotSendSem + } + }, nil +} + +// SendDelegatedSnapshot reads the incoming delegated snapshot message throttles +// sending snapshots before passing the request to the sender replica. +func (s *Store) SendDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegatedSnapshotRequest, + stream DelegateSnapshotResponseStream, +) error { + ctx = s.AnnotateCtx(ctx) + const name = "storage.Store: handle snapshot delegation" + return s.stopper.RunTaskWithErr( + ctx, name, func(ctx context.Context) error { + s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1) + // get the new sender replica. + sender, err := s.GetReplica(req.SnapRequest.Header.RaftMessageRequest.RangeID) + if err != nil { + return err + } + // throttle snapshot sending. + cleanup, err := s.reserveSendSnapshot(ctx, req) + if err != nil { + return err + } + defer cleanup() + return sender.sendSnapshot( + ctx, req.SnapRequest.Header.RaftMessageRequest.ToReplica, req, + ) + }, + ) +} + // HandleSnapshot reads an incoming streaming snapshot and applies it if // possible. func (s *Store) HandleSnapshot( @@ -70,11 +163,12 @@ func (s *Store) HandleSnapshot( ) error { ctx = s.AnnotateCtx(ctx) const name = "storage.Store: handle snapshot" - return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error { - s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1) + return s.stopper.RunTaskWithErr( + ctx, name, func(ctx context.Context) error { + s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1) - return s.receiveSnapshot(ctx, header, stream) - }) + return s.receiveSnapshot(ctx, header, stream) + }) } func (s *Store) uncoalesceBeats( diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 570e4a1a1d11..41d1d2911233 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -61,6 +61,20 @@ type outgoingSnapshotStream interface { Recv() (*kvserverpb.SnapshotResponse, error) } +// incomingSnapshotStream is the minimal interface on a GRPC stream required +// to receive a snapshot over the network. +type incomingDelegatedStream interface { + Send(*kvserverpb.SnapshotResponse) error + Recv() (*kvserverpb.DelegatedSnapshotRequest, error) +} + +// outgoingSnapshotStream is the minimal interface on a GRPC stream required +// to send a snapshot over the network. +type outgoingDelegatedStream interface { + Send(*kvserverpb.DelegatedSnapshotRequest) error + Recv() (*kvserverpb.SnapshotResponse, error) +} + // snapshotStrategy is an approach to sending and receiving Range snapshots. // Each implementation corresponds to a SnapshotRequest_Strategy, and it is // expected that the implementation that matches the Strategy specified in the @@ -1101,11 +1115,13 @@ func sendSnapshot( // (only a limited number of snapshots are allowed concurrently) or flat-out // reject the snapshot. After the initial message exchange, we'll go and send // the actual snapshot (if not rejected). + resp, err := stream.Recv() if err != nil { storePool.throttle(throttleFailed, err.Error(), to.StoreID) return err } + switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: storePool.throttle(throttleFailed, resp.Message, to.StoreID) @@ -1198,7 +1214,79 @@ func sendSnapshot( case kvserverpb.SnapshotResponse_APPLIED: return nil default: - return errors.Errorf("%s: server sent an invalid status during finalization: %s", - to, resp.Status) + return errors.Errorf( + "%s: server sent an invalid status during finalization: %s", + to, resp.Status, + ) + } +} + +// sendDelegatedSnapshot sends an outgoing delegated snapshot request via a +// pre-opened GRPC stream. It sends the delegated snapshot request to the new +// sender and waits for confirmation if the snapshot has been applied. +func sendDelegatedSnapshot( + ctx context.Context, + stream outgoingDelegatedStream, + storePool SnapshotStorePool, + req *kvserverpb.DelegatedSnapshotRequest, +) error { + + to := req.SnapRequest.Header.RaftMessageRequest.ToReplica + delegatedSender := req.DelegatedSender + if err := stream.Send(req); err != nil { + return err + } + // Wait for response to see if the receiver accepted or rejected + resp, err := stream.Recv() + if err != nil { + storePool.throttle(throttleFailed, err.Error(), delegatedSender.StoreID) + return err + } + switch resp.Status { + case kvserverpb.SnapshotResponse_ERROR: + storePool.throttle(throttleFailed, resp.Message, delegatedSender.StoreID) + return errors.Errorf( + "%s: remote couldn't accept %s with error: %s", + delegatedSender, req, resp.Message, + ) + case kvserverpb.SnapshotResponse_ACCEPTED: + // This is the response we're expecting. New sender accepted the request. + default: + err := errors.Errorf( + "%s: server sent an invalid status while negotiating %s: %s", + delegatedSender, req, resp.Status, + ) + storePool.throttle(throttleFailed, err.Error(), delegatedSender.StoreID) + return err + } + + // Wait for response to see if the receiver successfully applied the snapshot + resp, err = stream.Recv() + if err != nil { + return errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender) + } + // Wait for EOF to ensure server side processing is complete. + if unexpectedResp, err := stream.Recv(); err != io.EOF { + if err != nil { + return errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) + } + return errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) + } + switch resp.Status { + case kvserverpb.SnapshotResponse_ERROR: + storePool.throttle(throttleFailed, resp.Message, delegatedSender.StoreID) + return errors.Errorf("%s: remote couldn't apply the snapshot with error: %s", + to, resp.Message) + case kvserverpb.SnapshotResponse_APPLIED: + // This is the response we're expecting. Snapshot successfully applied. + return nil + default: + err := errors.Errorf( + "%s: server sent an invalid status while negotiating %s: %s", + delegatedSender, req, resp.Status, + ) + storePool.throttle(throttleFailed, err.Error(), delegatedSender.StoreID) + return err } + } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index b51df4bfbfac..b2bef41170f9 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -140,6 +140,8 @@ type StoreTestingKnobs struct { // DisableTimeSeriesMaintenanceQueue disables the time series maintenance // queue. DisableTimeSeriesMaintenanceQueue bool + // TESTING + DelegateSnapshots func() bool // DisableRaftSnapshotQueue disables the raft snapshot queue. Use this // sparingly, as it tends to produce flaky tests during up-replication where // the explicit learner snapshot gets lost. Since uninitialized replicas