From 92572bad8d16fdec12c025fd1961525235b5ce5c Mon Sep 17 00:00:00 2001 From: Amy Gao Date: Mon, 28 Feb 2022 22:01:53 -0500 Subject: [PATCH] kvserver: self delegated snapshots This commit adds a new rpc stream for sending raft message requests between replicas which allows for delegating snapshots. Currently this patch implements the leaseholder delegating to itself, but in future patches the leaseholder will be able to delegate snapshot sending to follower replicas. A new raft message request type of `DelegatedSnapshotRequest` includes a header of nessesary fields from a `SnapshotRequest` and the replica descriptor of the new sender replica. This allows the leaseholder to fill in snapshot metadata before delegating to the new sender store to generate the snapshot and transmit it to the recipient. Related to #42491 Release note: None --- docs/generated/settings/settings.html | 1 + pkg/kv/kvserver/client_raft_helpers_test.go | 14 ++ pkg/kv/kvserver/client_raft_test.go | 10 ++ pkg/kv/kvserver/kvserverpb/BUILD.bazel | 2 + pkg/kv/kvserver/kvserverpb/raft.proto | 43 ++++++ pkg/kv/kvserver/raft_transport.go | 115 +++++++++++++- pkg/kv/kvserver/raft_transport_test.go | 9 ++ pkg/kv/kvserver/replica_command.go | 150 +++++++++++++++--- pkg/kv/kvserver/replica_learner_test.go | 97 ++++++++++++ pkg/kv/kvserver/storage_services.proto | 1 + pkg/kv/kvserver/store.go | 15 +- pkg/kv/kvserver/store_raft.go | 49 ++++++ pkg/kv/kvserver/store_snapshot.go | 163 ++++++++++++++++++-- pkg/kv/kvserver/testing_knobs.go | 5 + 14 files changed, 630 insertions(+), 44 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d51d862aa25a..dd8363d7eb07 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -50,6 +50,7 @@ kv.replica_circuit_breaker.slow_replication_thresholdduration1m0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) kv.replica_stats.addsst_request_size_factorinteger50000the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 kv.replication_reports.intervalduration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) +kv.snapshot_delegation.enabledbooleanfalseset to true to allow snapshots from follower replicas kv.snapshot_rebalance.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots kv.transaction.max_intents_bytesinteger4194304maximum number of bytes used to track locks in transactions diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 33000a87dcea..ae14890d7282 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3" ) @@ -180,6 +181,19 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( return store.HandleSnapshot(ctx, header, respStream) } +func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegateSnapshotRequest, + stream kvserver.DelegateSnapshotResponseStream, + span *tracing.Span, +) error { + store, err := h.getStore() + if err != nil { + return err + } + return store.HandleDelegatedSnapshot(ctx, req, stream, span) +} + // testClusterPartitionedRange is a convenient abstraction to create a range on a node // in a multiTestContext which can be partitioned and unpartitioned. type testClusterPartitionedRange struct { diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index aa2e6799f917..fd65c709c0d9 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -3452,6 +3453,15 @@ func (errorChannelTestHandler) HandleSnapshot( panic("unimplemented") } +func (errorChannelTestHandler) HandleDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegateSnapshotRequest, + stream kvserver.DelegateSnapshotResponseStream, + span *tracing.Span, +) error { + panic("unimplemented") +} + // This test simulates a scenario where one replica has been removed from the // range's Raft group but it is unaware of the fact. We check that this replica // coming back from the dead cannot cause elections. diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index 51fb08fa05ef..f5e0249eca95 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -36,6 +36,7 @@ proto_library( "//pkg/roachpb:roachpb_proto", "//pkg/storage/enginepb:enginepb_proto", "//pkg/util/hlc:hlc_proto", + "//pkg/util/tracing/tracingpb:tracingpb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:timestamp_proto", "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", @@ -55,6 +56,7 @@ go_proto_library( "//pkg/roachpb", "//pkg/storage/enginepb", "//pkg/util/hlc", + "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", # keep "@com_github_gogo_protobuf//gogoproto", "@io_etcd_go_etcd_raft_v3//raftpb", diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 2fa84622e03f..8fe840f354c5 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -13,11 +13,13 @@ package cockroach.kv.kvserver.kvserverpb; option go_package = "kvserverpb"; import "roachpb/errors.proto"; +import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; import "etcd/raft/v3/raftpb/raft.proto"; import "gogoproto/gogo.proto"; +import "util/tracing/tracingpb/recorded_span.proto"; // RaftHeartbeat is a request that contains the barebones information for a // raftpb.MsgHeartbeat raftpb.Message. RaftHeartbeats are coalesced and sent @@ -214,6 +216,47 @@ message SnapshotResponse { reserved 3; } +// DelegateSnapshotRequest is the request used to delegate send snapshot requests. +message DelegateSnapshotRequest { + message Header { + + uint64 range_id = 1 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + + // The replica that delegates the snapshot request, in most cases the leaseholder. + // The snapshot request should originate from the coordinator. + roachpb.ReplicaDescriptor coordinator_replica = 2 [(gogoproto.nullable) = false]; + + // The replica receiving the snapshot. + roachpb.ReplicaDescriptor recipient_replica = 3 [(gogoproto.nullable) = false]; + + // The priority of the snapshot. + SnapshotRequest.Priority priority = 4; + + // The type of the snapshot. + SnapshotRequest.Type type = 6; + + // The Raft term of the coordinator (in most cases the leaseholder) replica. + // The term is used during snapshot receiving to reject messages from an older term. + uint64 term = 8; + } + + Header header = 1; + + // The replica selected to act as the snapshot sender. + roachpb.ReplicaDescriptor delegated_sender = 2 [(gogoproto.nullable) = false]; + + // The truncated state of the Raft log on the coordinator replica. + roachpb.RaftTruncatedState truncated_state = 3; +} + +message DelegateSnapshotResponse { + SnapshotResponse snapResponse = 1; + // collected_spans stores trace spans recorded during the execution of this + // request. + repeated util.tracing.tracingpb.RecordedSpan collected_spans = 2 [(gogoproto.nullable) = false]; +} + // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. message ConfChangeContext { string command_id = 1 [(gogoproto.customname) = "CommandID"]; diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index d0a5ab57a79a..887289837b16 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -105,6 +105,13 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } +// DelegateSnapshotResponseStream is the subset of the +// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses. +type DelegateSnapshotResponseStream interface { + Send(request *kvserverpb.DelegateSnapshotResponse) error + Recv() (*kvserverpb.DelegateSnapshotRequest, error) +} + // RaftMessageHandler is the interface that must be implemented by // arguments to RaftTransport.Listen. type RaftMessageHandler interface { @@ -122,7 +129,20 @@ type RaftMessageHandler interface { // HandleSnapshot is called for each new incoming snapshot stream, after // parsing the initial SnapshotRequest_Header on the stream. - HandleSnapshot(ctx context.Context, header *kvserverpb.SnapshotRequest_Header, respStream SnapshotResponseStream) error + HandleSnapshot( + ctx context.Context, + header *kvserverpb.SnapshotRequest_Header, + respStream SnapshotResponseStream, + ) error + + // HandleDelegatedSnapshot is called for each incoming delegated snapshot + // request. + HandleDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegateSnapshotRequest, + stream DelegateSnapshotResponseStream, + span *tracing.Span, + ) error } type raftTransportStats struct { @@ -407,6 +427,72 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer } } +// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes +// the request to pass off to the sender store. Errors during the snapshots +// process are sent back as a response. +func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error { + errCh := make(chan error, 1) + taskCtx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context()) + remoteParent, err := tracing.ExtractSpanMetaFromGRPCCtx(taskCtx, t.Tracer) + if err != nil { + log.Warningf(taskCtx, "error extracting tracing info from gRPC: %s", err) + } + taskCtx, span := t.Tracer.StartSpanCtx( + taskCtx, tracing.BatchMethodName, + tracing.WithRemoteParentFromSpanMeta(remoteParent), + tracing.WithServerSpanKind, + ) + defer cancel() + if err := t.stopper.RunAsyncTaskEx( + taskCtx, + stop.TaskOpts{ + TaskName: "storage.RaftTransport: processing snapshot delegation", + SpanOpt: stop.ChildSpan, + }, func(ctx context.Context) { + errCh <- func() error { + req, err := stream.Recv() + if err != nil { + return err + } + // Check to ensure the header is valid. + if req.Header == nil { + return stream.Send( + &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, + Message: "client error: no header in first delegated snapshot request message", + }, + }, + ) + } + // Get the handler of the sender store. + handler, ok := t.getHandler(req.DelegatedSender.StoreID) + if !ok { + log.Warningf( + ctx, + "unable to accept Raft message: %+v: no handler registered for"+ + " the sender store"+" %+v", + req.Header.CoordinatorReplica.StoreID, + req.DelegatedSender.StoreID, + ) + return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID) + } + + // Pass off the snapshot request to the sender store. + return handler.HandleDelegatedSnapshot(ctx, req, stream, span) + }() + }, + ); err != nil { + return err + } + select { + case <-t.stopper.ShouldQuiesce(): + return nil + case err := <-errCh: + return err + } +} + // RaftSnapshot handles incoming streaming snapshot requests. func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error { errCh := make(chan error, 1) @@ -415,7 +501,7 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error if err := t.stopper.RunAsyncTaskEx( taskCtx, stop.TaskOpts{ - TaskName: "storage.RaftTransport: processing snapshot", + TaskName: "storage.RaftTransport: processing snapshot reception", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { errCh <- func() error { @@ -709,3 +795,28 @@ func (t *RaftTransport) SendSnapshot( }() return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, bytesSentCounter) } + +// DelegateSnapshot creates a rpc stream between the leaseholder and the +// new designated sender for delegated snapshot requests. +func (t *RaftTransport) DelegateSnapshot( + ctx context.Context, storePool *StorePool, req *kvserverpb.DelegateSnapshotRequest, +) error { + nodeID := req.DelegatedSender.NodeID + conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) + if err != nil { + return err + } + client := NewMultiRaftClient(conn) + + // Creates a rpc stream between the leaseholder and sender. + stream, err := client.DelegateRaftSnapshot(ctx) + if err != nil { + return err + } + defer func() { + if err := stream.CloseSend(); err != nil { + log.Warningf(ctx, "failed to close snapshot stream: %+v", err) + } + }() + return delegateSnapshot(ctx, stream, req) +} diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index 6b6ca7c440a7..0cb3064baed6 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -96,6 +96,15 @@ func (s channelServer) HandleSnapshot( panic("unexpected HandleSnapshot") } +func (s channelServer) HandleDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegateSnapshotRequest, + stream kvserver.DelegateSnapshotResponseStream, + span *tracing.Span, +) error { + panic("unimplemented") +} + // raftTransportTestContext contains objects needed to test RaftTransport. // Typical usage will add multiple nodes with AddNode, attach channels // to at least one store with ListenStore, and send messages with Send. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 5fdeb2c41816..466e8ff06b76 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" @@ -2387,6 +2388,15 @@ func recordRangeEventsInLog( return nil } +// getSenderReplica returns a replica descriptor for a follower replica to act as +// the sender for snapshots. +// TODO(amy): select a follower based on locality matching. +func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) { + log.Fatal(ctx, "follower snapshots not implemented") + return r.GetReplicaDescriptor() +} + +// TODO(amy): update description when patch for follower snapshots are completed. // sendSnapshot sends a snapshot of the replica state to the specified replica. // Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful // about adding additional calls as generating a snapshot is moderately @@ -2510,6 +2520,103 @@ func (r *Replica) sendSnapshot( r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr) }() + sender, err := r.GetReplicaDescriptor() + if err != nil { + return err + } + // Check follower snapshots cluster setting. + if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) { + sender, err = r.getSenderReplica(ctx) + if err != nil { + return err + } + } + + log.VEventf( + ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender) + desc, err := r.GetReplicaDescriptor() + if err != nil { + return err + } + status := r.RaftStatus() + if status == nil { + // This code path is sometimes hit during scatter for replicas that + // haven't woken up yet. + return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + } + + // Create new delegate snapshot request header with only required metadata. + header := &kvserverpb.DelegateSnapshotRequest_Header{ + RangeID: r.RangeID, + CoordinatorReplica: desc, + RecipientReplica: recipient, + Priority: priority, + Type: snapType, + Term: status.Term, + } + delegateRequest := &kvserverpb.DelegateSnapshotRequest{ + Header: header, DelegatedSender: sender, + } + err = contextutil.RunWithTimeout( + ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { + return r.store.cfg.Transport.DelegateSnapshot( + ctx, + r.store.allocator.storePool, + delegateRequest, + ) + }, + ) + + if err != nil { + return errors.Mark(err, errMarkSnapshotError) + } + return nil +} + +// followerSnapshotsEnabled is used to enable or disable follower snapshots. +var followerSnapshotsEnabled = func() *settings.BoolSetting { + s := settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.snapshot_delegation.enabled", + "set to true to allow snapshots from follower replicas", + false, + ) + s.SetVisibility(settings.Public) + return s +}() + +// followerSendSnapshot receives a delegate snapshot request and generates the +// snapshot from this replica. The entire process of generating and transmitting +// the snapshot is handled, and errors are propagated back to the leaseholder. +func (r *Replica) followerSendSnapshot( + ctx context.Context, + recipient roachpb.ReplicaDescriptor, + req *kvserverpb.DelegateSnapshotRequest, + stream DelegateSnapshotResponseStream, +) (retErr error) { + ctx = r.AnnotateCtx(ctx) + + // TODO(amy): when delegating to different senders, check raft applied state + // to determine if this follower replica is fit to send. + // Acknowledge that the request has been accepted. + if err := stream.Send( + &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ACCEPTED, + }, + }, + ); err != nil { + return err + } + + // Throttle snapshot sending. + cleanup, err := r.store.reserveSendSnapshot(ctx, req, r.GetMVCCStats().Total()) + if err != nil { + return err + } + defer cleanup() + + snapType := req.Header.Type snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID) if err != nil { err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) @@ -2524,21 +2631,11 @@ func (r *Replica) sendSnapshot( // the leaseholder and we haven't yet applied the configuration change that's // adding the recipient to the range. if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - return errors.Wrapf(errMarkSnapshotError, + return errors.Wrapf( + errMarkSnapshotError, "attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc) - } - - sender, err := r.GetReplicaDescriptor() - if err != nil { - return errors.Wrapf(err, "%s: change replicas failed", r) - } - - status := r.RaftStatus() - if status == nil { - // This code path is sometimes hit during scatter for replicas that - // haven't woken up yet. - return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc, + ) } // We avoid shipping over the past Raft log in the snapshot by changing @@ -2556,25 +2653,26 @@ func (r *Replica) sendSnapshot( // explicitly for snapshots going out to followers. snap.State.DeprecatedUsingAppliedStateKey = true - req := kvserverpb.SnapshotRequest_Header{ + // Create new snapshot request header using the delegate snapshot request. + header := kvserverpb.SnapshotRequest_Header{ State: snap.State, DeprecatedUnreplicatedTruncatedState: true, RaftMessageRequest: kvserverpb.RaftMessageRequest{ - RangeID: r.RangeID, - FromReplica: sender, - ToReplica: recipient, + RangeID: req.Header.RangeID, + FromReplica: req.Header.CoordinatorReplica, + ToReplica: req.Header.RecipientReplica, Message: raftpb.Message{ Type: raftpb.MsgSnap, - To: uint64(recipient.ReplicaID), - From: uint64(sender.ReplicaID), - Term: status.Term, + From: uint64(req.Header.CoordinatorReplica.ReplicaID), + To: uint64(req.Header.RecipientReplica.ReplicaID), + Term: req.Header.Term, Snapshot: snap.RaftSnap, }, }, RangeSize: r.GetMVCCStats().Total(), - Priority: priority, + Priority: req.Header.Priority, Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: snapType, + Type: req.Header.Type, } newBatchFn := func() storage.Batch { return r.store.Engine().NewUnindexedBatch(true /* writeOnly */) @@ -2582,18 +2680,20 @@ func (r *Replica) sendSnapshot( sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + err = contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( ctx, r.store.allocator.storePool, - req, + header, snap, newBatchFn, sent, r.store.metrics.RangeSnapshotSentBytes, ) - }) + }, + ) if err != nil { if errors.Is(err, errMalformedSnapshot) { tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 3edeb38dae50..a5a6104b9c2d 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -224,6 +224,103 @@ func TestAddRemoveNonVotingReplicasBasic(t *testing.T) { require.Len(t, desc.Replicas().NonVoterDescriptors(), 0) } +// TestAddReplicaWithReceiverThrottling tests that outgoing snapshots on the +// delegated sender will throttle if incoming snapshots on the recipients are +// blocked. +func TestAddReplicaWithReceiverThrottling(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // blockIncomingSnapshots will block receiving snapshots. + blockIncomingSnapshots := make(chan struct{}) + waitForRebalanceToBlockCh := make(chan struct{}) + activateBlocking := int64(1) + var count int64 + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&activateBlocking) > 0 { + // Signal waitForRebalanceToBlockCh to indicate the testing knob was hit. + close(waitForRebalanceToBlockCh) + blockIncomingSnapshots <- struct{}{} + } + return nil + } + ltk.storeKnobs.ThrottleEmptySnapshots = true + ltk.storeKnobs.CountSendSnapshotsThrottling = func(n int) { + atomic.AddInt64(&count, int64(n)) + } + ctx := context.Background() + tc := testcluster.StartTestCluster( + t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }, + ) + + defer tc.Stopper().Stop(ctx) + scratch := tc.ScratchRange(t) + replicationChange := make(chan error, 2) + g := ctxgroup.WithContext(ctx) + + // Add a voter to the range and expect it to block on blockIncomingSnapshots. + g.GoCtx( + func(ctx context.Context) error { + desc, err := tc.LookupRange(scratch) + if err != nil { + return err + } + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratch, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(2)), + ) + replicationChange <- err + return err + }, + ) + + select { + case <-waitForRebalanceToBlockCh: + // First replication change has hit the testing knob, continue with adding + // second voter. + case <-replicationChange: + t.Fatal("did not expect the replication change to complete") + case <-time.After(15 * time.Second): + t.Fatal("timed out waiting for rebalance to block") + } + + g.GoCtx( + func(ctx context.Context) error { + desc, err := tc.LookupRange(scratch) + if err != nil { + return err + } + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratch, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + replicationChange <- err + return err + }, + ) + + require.Eventually( + t, func() bool { + // Check that there is 1 snapshot waiting on the snapshot send semaphore, + // as the other snapshot should currently be throttled in the semaphore. + return atomic.LoadInt64(&count) == int64(1) + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond, + ) + // Expect that the replication change is blocked on the channel, and the + // snapshot is still throttled on the send snapshot semaphore. + select { + case <-time.After(1 * time.Second): + require.Equalf(t, atomic.LoadInt64(&count), int64(1), "expected snapshot to still be blocked.") + case <-replicationChange: + t.Fatal("did not expect the replication change to complete") + } + + // Disable the testing knob for blocking recipient snapshots to finish test. + atomic.StoreInt64(&activateBlocking, 0) + <-blockIncomingSnapshots +} func TestLearnerRaftConfState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index 61b762ebce25..b124c63f485f 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -19,6 +19,7 @@ import "gogoproto/gogo.proto"; service MultiRaft { rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {} rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} + rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} } service PerReplica { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 82793f033400..525f80256619 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -764,6 +764,10 @@ type Store struct { // Semaphore to limit concurrent non-empty snapshot application. snapshotApplySem chan struct{} + // Semaphore to limit concurrent non-empty snapshot sending. + initialSnapshotSendSem chan struct{} + // Semaphore to limit concurrent non-empty snapshot sending. + raftSnapshotSendSem chan struct{} // Track newly-acquired expiration-based leases that we want to proactively // renew. An object is sent on the signal whenever a new entry is added to @@ -1037,6 +1041,10 @@ type StoreConfig struct { // to be applied concurrently. concurrentSnapshotApplyLimit int + // concurrentSnapshotSendLimit specifies the maximum number of each type of + // snapshot that are permitted to be sent concurrently. + concurrentSnapshotSendLimit int + // HistogramWindowInterval is (server.Config).HistogramWindowInterval HistogramWindowInterval time.Duration @@ -1112,6 +1120,10 @@ func (sc *StoreConfig) SetDefaults() { sc.concurrentSnapshotApplyLimit = envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_APPLY_LIMIT", 1) } + if sc.concurrentSnapshotSendLimit == 0 { + sc.concurrentSnapshotSendLimit = + envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", 1) + } if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 { sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction @@ -1198,7 +1210,8 @@ func NewStore( s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval) s.metrics.registry.AddMetricStruct(s.txnWaitMetrics) s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) - + s.initialSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit) + s.raftSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit) if ch := s.cfg.TestingKnobs.LeaseRenewalSignalChan; ch != nil { s.renewableLeasesSignal = ch } else { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 6ff0c8043cd8..2e3044008c8c 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3/raftpb" ) @@ -63,6 +64,54 @@ func (q *raftRequestQueue) recycle(processed []raftRequestInfo) { } } +// HandleDelegatedSnapshot reads the incoming delegated snapshot message and +// throttles sending snapshots before passing the request to the sender replica. +func (s *Store) HandleDelegatedSnapshot( + ctx context.Context, + req *kvserverpb.DelegateSnapshotRequest, + stream DelegateSnapshotResponseStream, + span *tracing.Span, +) error { + ctx = s.AnnotateCtx(ctx) + const name = "storage.Store: handle snapshot delegation" + return s.stopper.RunTaskWithErr( + ctx, name, func(ctx context.Context) error { + sender, err := s.GetReplica(req.Header.RangeID) + if err != nil { + return err + } + // Pass the request to the sender replica. + err = sender.followerSendSnapshot(ctx, req.Header.RecipientReplica, req, stream) + + // Get the recording of the child RPC and serialize it in the response. + recording := span.FinishAndGetConfiguredRecording() + resp := &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_APPLIED, + Message: "Snapshot successfully applied by recipient", + }, + } + if recording != nil { + resp.CollectedSpans = recording + } + // If an error occurred during snapshot sending, send an error response. + if err != nil { + return stream.Send( + &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, + Message: err.Error(), + }, + CollectedSpans: resp.CollectedSpans, + }, + ) + } + // Send a final response that snapshot sending is completed. + return stream.Send(resp) + }, + ) +} + // HandleSnapshot reads an incoming streaming snapshot and applies it if // possible. func (s *Store) HandleSnapshot( diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 1d7fba36525e..bb27c7964004 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -26,11 +26,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -62,6 +64,13 @@ type outgoingSnapshotStream interface { Recv() (*kvserverpb.SnapshotResponse, error) } +// outgoingSnapshotStream is the minimal interface on a GRPC stream required +// to send a snapshot over the network. +type outgoingDelegatedStream interface { + Send(*kvserverpb.DelegateSnapshotRequest) error + Recv() (*kvserverpb.DelegateSnapshotResponse, error) +} + // snapshotStrategy is an approach to sending and receiving Range snapshots. // Each implementation corresponds to a SnapshotRequest_Strategy, and it is // expected that the implementation that matches the Strategy specified in the @@ -422,18 +431,48 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { } } -// reserveSnapshot throttles incoming snapshots. The returned closure is used -// to cleanup the reservation and release its resources. +// reserveSnapshot throttles incoming snapshots. func (s *Store) reserveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { - tBegin := timeutil.Now() + return s.throttleSnapshot( + ctx, s.snapshotApplySem, header.RangeSize, + header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, + ) +} +// reserveSendSnapshot throttles outgoing snapshots. +func (s *Store) reserveSendSnapshot( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, rangeSize int64, +) (_cleanup func(), _err error) { + sem := s.initialSnapshotSendSem + if req.Header.Type == kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE { + sem = s.raftSnapshotSendSem + } + if fn := s.cfg.TestingKnobs.CountSendSnapshotsThrottling; fn != nil { + fn(1) + } + return s.throttleSnapshot(ctx, sem, rangeSize, + req.Header.RangeID, req.DelegatedSender.ReplicaID, + ) +} + +// throttleSnapshot is a helper function to throttle snapshot sending and +// receiving. The returned closure is used to cleanup the reservation and +// release its resources. +func (s *Store) throttleSnapshot( + ctx context.Context, + snapshotSem chan struct{}, + rangeSize int64, + rangeID roachpb.RangeID, + replicaID roachpb.ReplicaID, +) (_cleanup func(), _err error) { + tBegin := timeutil.Now() // Empty snapshots are exempt from rate limits because they're so cheap to // apply. This vastly speeds up rebalancing any empty ranges created by a // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. - if header.RangeSize != 0 { + if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { queueCtx := ctx if deadline, ok := queueCtx.Deadline(); ok { // Enforce a more strict timeout for acquiring the snapshot reservation to @@ -447,14 +486,20 @@ func (s *Store) reserveSnapshot( defer cancel() } select { - case s.snapshotApplySem <- struct{}{}: + case snapshotSem <- struct{}{}: + // Got a spot in the semaphore, continue with sending the snapshot. + if fn := s.cfg.TestingKnobs.CountSendSnapshotsThrottling; fn != nil { + fn(-1) + } case <-queueCtx.Done(): if err := ctx.Err(); err != nil { return nil, errors.Wrap(err, "acquiring snapshot reservation") } - return nil, errors.Wrapf(queueCtx.Err(), + return nil, errors.Wrapf( + queueCtx.Err(), "giving up during snapshot reservation due to %q", - snapshotReservationQueueTimeoutFraction.Key()) + snapshotReservationQueueTimeoutFraction.Key(), + ) case <-s.stopper.ShouldQuiesce(): return nil, errors.Errorf("stopped") } @@ -464,24 +509,27 @@ func (s *Store) reserveSnapshot( // Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot. // which is what we want to log. const snapshotReservationWaitWarnThreshold = 32 * time.Second - if elapsed := timeutil.Since(tBegin); elapsed > snapshotReservationWaitWarnThreshold { - replDesc, _ := header.State.Desc.GetReplicaDescriptor(s.StoreID()) + elapsed := timeutil.Since(tBegin) + // NB: this log message is skipped in test builds as many tests do not mock + // all of the objects being logged. + if elapsed > snapshotReservationWaitWarnThreshold && !buildutil.CrdbTestBuild { log.Infof( ctx, "waited for %.1fs to acquire snapshot reservation to r%d/%d", elapsed.Seconds(), - header.State.Desc.RangeID, - replDesc.ReplicaID, + rangeID, + replicaID, ) } s.metrics.ReservedReplicaCount.Inc(1) - s.metrics.Reserved.Inc(header.RangeSize) + s.metrics.Reserved.Inc(rangeSize) return func() { s.metrics.ReservedReplicaCount.Dec(1) - s.metrics.Reserved.Dec(header.RangeSize) - if header.RangeSize != 0 { - <-s.snapshotApplySem + s.metrics.Reserved.Dec(rangeSize) + + if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { + <-snapshotSem } }, nil } @@ -1226,6 +1274,89 @@ func sendSnapshot( return nil default: return errors.Errorf("%s: server sent an invalid status during finalization: %s", - to, resp.Status) + to, resp.Status, + ) + } +} + +// delegateSnapshot sends an outgoing delegated snapshot request via a +// pre-opened GRPC stream. It sends the delegated snapshot request to the +// sender and waits for confirmation that the snapshot has been applied. +func delegateSnapshot( + ctx context.Context, stream outgoingDelegatedStream, req *kvserverpb.DelegateSnapshotRequest, +) error { + + delegatedSender := req.DelegatedSender + if err := stream.Send(req); err != nil { + return err + } + // Wait for a response from the sender. + resp, err := stream.Recv() + if err != nil { + return err + } + switch resp.SnapResponse.Status { + case kvserverpb.SnapshotResponse_ERROR: + return errors.Errorf( + "%s: sender couldn't accept %s with error: %s", delegatedSender, + req, resp.SnapResponse.Message, + ) + case kvserverpb.SnapshotResponse_ACCEPTED: + // The sender accepted the request, it will continue with sending. + log.VEventf( + ctx, 2, "sender %s accepted snapshot request %s", delegatedSender, + req, + ) + default: + err := errors.Errorf( + "%s: server sent an invalid status while negotiating %s: %s", + delegatedSender, req, resp.SnapResponse.Status, + ) + return err + } + + // Wait for response to see if the receiver successfully applied the snapshot. + resp, err = stream.Recv() + if err != nil { + return errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender) + } + // Wait for EOF to ensure server side processing is complete. + if unexpectedResp, err := stream.Recv(); err != io.EOF { + if err != nil { + return errors.Wrapf( + err, "%s: expected EOF, got resp=%v with error", + delegatedSender.StoreID, unexpectedResp, + ) + } + return errors.Newf( + "%s: expected EOF, got resp=%v", delegatedSender.StoreID, + unexpectedResp, + ) + } + // Import the remotely collected spans, if any. + if len(resp.CollectedSpans) != 0 { + span := tracing.SpanFromContext(ctx) + if span == nil { + log.Warningf( + ctx, + "trying to ingest remote spans but there is no recording span set up", + ) + } else { + span.ImportRemoteSpans(resp.CollectedSpans) + } } + switch resp.SnapResponse.Status { + case kvserverpb.SnapshotResponse_ERROR: + return errors.Newf("%s", resp.SnapResponse.Message) + case kvserverpb.SnapshotResponse_APPLIED: + // This is the response we're expecting. Snapshot successfully applied. + log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender) + return nil + default: + return errors.Errorf( + "%s: server sent an invalid status during finalization: %s", + delegatedSender, resp.SnapResponse.Status, + ) + } + } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index b51df4bfbfac..4df4b12a188e 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -390,6 +390,11 @@ type StoreTestingKnobs struct { // IgnoreStrictGCEnforcement is used by tests to op out of strict GC // enforcement. IgnoreStrictGCEnforcement bool + // ThrottleEmptySnapshots includes empty snapshots for throttling. + ThrottleEmptySnapshots bool + // CountSendSnapshotsThrottling counts the number of snapshots currently + // waiting for the snapshot send semaphore. + CountSendSnapshotsThrottling func(int) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.