From c8e870b8c6a8493c9a2bad356d4d68e67aae8a21 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 28 Sep 2022 09:12:23 -0400 Subject: [PATCH] kvserver: delegate snapshots to followers Fixes: #42491 This commit allows a snapshot to be sent by a follower instead of the leader of a range. The follower(s) are chosen based on locality to the final recipient of the snapshot. If the follower is not able to quickly send the snapshot, the attempt is aborted and the leader sends the snapshot instead. By choosing a delegate rather than sending the snapshot directly, WAN traffic can be minimized. Additionally the snapshot will likely be delivered faster. There are two settings that control this feature. The first, `kv.snapshot_delegation.num_follower`, controls how many followers the snapshot is attempted to be delegated through. If set to 0, then snapshot delegation is disabled. The second, `kv.snapshot_delegation_queue.enabled`, controls whether delegated snapshots will queue on the delegate or return failure immediately. This is useful to prevent a delegation request from spending a long time waiting before it is sent. Before the snapshot is sent from the follower checks are done to verify that the delegate is able to send a snapshot that will be valid for the recipient. If not the request is rerouted to the leader. Release note (performance improvement): Adds delegated snapshots which can reduce WAN traffic for snapshot movement. --- docs/generated/settings/settings.html | 3 +- pkg/cmd/roachtest/tests/decommissionbench.go | 91 ++-- .../allocator/storepool/store_pool.go | 35 ++ pkg/kv/kvserver/client_raft_helpers_test.go | 69 ++- pkg/kv/kvserver/client_raft_test.go | 6 +- pkg/kv/kvserver/helpers_test.go | 2 +- pkg/kv/kvserver/kvserverpb/raft.go | 7 +- pkg/kv/kvserver/kvserverpb/raft.proto | 50 +- pkg/kv/kvserver/multiqueue/BUILD.bazel | 4 +- pkg/kv/kvserver/multiqueue/multi_queue.go | 44 +- .../kvserver/multiqueue/multi_queue_test.go | 143 +++-- pkg/kv/kvserver/raft_log_queue_test.go | 15 +- pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/raft_transport.go | 85 ++- pkg/kv/kvserver/raft_transport_test.go | 8 +- pkg/kv/kvserver/replica_command.go | 487 +++++++++++++----- pkg/kv/kvserver/replica_learner_test.go | 210 +++++++- pkg/kv/kvserver/replica_raft.go | 61 ++- pkg/kv/kvserver/replica_raftstorage.go | 30 +- pkg/kv/kvserver/storage_services.proto | 14 +- pkg/kv/kvserver/store_raft.go | 42 +- pkg/kv/kvserver/store_snapshot.go | 118 +---- pkg/kv/kvserver/store_test.go | 58 ++- pkg/kv/kvserver/testing_knobs.go | 5 +- 24 files changed, 1104 insertions(+), 485 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 98ab0410d259..b602783c0b0a 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -60,7 +60,8 @@
kv.replica_circuit_breaker.slow_replication_threshold
duration1m0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)
kv.replica_stats.addsst_request_size_factor
integer50000the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval
duration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) -
kv.snapshot_delegation.enabled
booleanfalseset to true to allow snapshots from follower replicas +
kv.snapshot_delegation.num_follower
integer1the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder +
kv.snapshot_delegation.num_requests
integer3how many queued requests are allowed on a delegate before the request is rejected
kv.snapshot_rebalance.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots
kv.snapshot_recovery.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots
kv.transaction.max_intents_bytes
integer4194304maximum number of bytes used to track locks in transactions diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 35c0c030146a..0c7ca78704af 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -61,13 +61,15 @@ const ( type decommissionBenchSpec struct { nodes int - cpus int warehouses int - load bool + noLoad bool multistore bool snapshotRate int duration time.Duration + // Whether the test cluster nodes are in multiple regions. + multiregion bool + // When true, the test will attempt to stop the node prior to decommission. whileDown bool @@ -85,6 +87,10 @@ type decommissionBenchSpec struct { // An override for the default timeout, if needed. timeout time.Duration + // An override for the decommission node to make it choose a predictable node + // instead of a random node. + decommissionNode int + skip string } @@ -95,29 +101,21 @@ func registerDecommissionBench(r registry.Registry) { // Basic benchmark configurations, to be run nightly. { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, duration: 1 * time.Hour, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, whileDown: true, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 4 * time.Hour, @@ -126,9 +124,7 @@ func registerDecommissionBench(r registry.Registry) { { // Add a new node during decommission (no drain). nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -138,9 +134,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, without adding a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -150,9 +144,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, and add a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so @@ -162,25 +154,19 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, drainFirst: true, skip: manualBenchmarkingOnly, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, slowWrites: true, skip: manualBenchmarkingOnly, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, slowWrites: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -189,9 +175,7 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 12, - cpus: 16, warehouses: 3000, - load: true, multistore: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -201,14 +185,30 @@ func registerDecommissionBench(r registry.Registry) { { // Test to compare 12 4-store nodes vs 48 single-store nodes nodes: 48, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 3 * time.Hour, skip: manualBenchmarkingOnly, }, + { + // Multiregion decommission, and add a new node in the same region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 2, + }, + { + // Multiregion decommission, and add a new node in a different region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 3, + }, } { registerDecommissionBenchSpec(r, benchSpec) } @@ -223,7 +223,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe } extraNameParts := []string{""} addlNodeCount := 0 - specOptions := []spec.Option{spec.CPU(benchSpec.cpus)} + specOptions := []spec.Option{spec.CPU(16)} if benchSpec.snapshotRate != 0 { extraNameParts = append(extraNameParts, @@ -251,7 +251,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "while-upreplicating") } - if !benchSpec.load { + if benchSpec.noLoad { extraNameParts = append(extraNameParts, "no-load") } @@ -259,11 +259,23 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "hi-read-amp") } + if benchSpec.decommissionNode != 0 { + extraNameParts = append(extraNameParts, + fmt.Sprintf("target=%d", benchSpec.decommissionNode)) + } + if benchSpec.duration > 0 { timeout = benchSpec.duration * 3 extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration)) } + if benchSpec.multiregion { + geoZones := []string{regionUsEast, regionUsWest, regionUsCentral} + specOptions = append(specOptions, spec.Zones(strings.Join(geoZones, ","))) + specOptions = append(specOptions, spec.Geo()) + extraNameParts = append(extraNameParts, "multi-region") + } + // If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs. noSkipFlag := os.Getenv(envDecommissionNoSkipFlag) if noSkipFlag != "" { @@ -273,8 +285,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraName := strings.Join(extraNameParts, "/") r.Add(registry.TestSpec{ - Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s", - benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName), + Name: fmt.Sprintf("decommissionBench/nodes=%d/warehouses=%d%s", + benchSpec.nodes, benchSpec.warehouses, extraName), Owner: registry.OwnerKV, Cluster: r.MakeClusterSpec( benchSpec.nodes+addlNodeCount+1, @@ -471,7 +483,7 @@ func uploadPerfArtifacts( // Get the workload perf artifacts and move them to the pinned node, so that // they can be used to display the workload operation rates during decommission. - if benchSpec.load { + if !benchSpec.noLoad { workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json") localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json") workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json") @@ -595,7 +607,7 @@ func runDecommissionBench( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -642,7 +654,7 @@ func runDecommissionBench( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -666,7 +678,7 @@ func runDecommissionBench( m.ExpectDeath() defer m.ResetDeaths() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -729,7 +741,7 @@ func runDecommissionBenchLong( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -773,7 +785,7 @@ func runDecommissionBenchLong( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -786,7 +798,7 @@ func runDecommissionBenchLong( for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; { m.ExpectDeath() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -827,12 +839,15 @@ func runSingleDecommission( ctx context.Context, h *decommTestHelper, pinnedNode int, + target int, targetLogicalNodeAtomic *uint32, snapshotRateMb int, stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool, tickByName func(name string), ) error { - target := h.getRandNodeOtherThan(pinnedNode) + if target == 0 { + target = h.getRandNodeOtherThan(pinnedNode) + } targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target) if err != nil { return err diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 55beabc549ff..2f1e60b3413c 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -826,6 +826,23 @@ func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { return status == storeStatusAvailable, nil } +// IsStoreHealthy returns whether we believe this store can serve requests +// reliably. A healthy store can be used for follower snapshot transmission or +// follower reads. A healthy store does not imply that replicas can be moved to +// this store. +func (sp *StorePool) IsStoreHealthy(storeID roachpb.StoreID) bool { + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) + if err != nil { + return false + } + switch status { + case storeStatusAvailable, storeStatusDecommissioning, storeStatusDraining: + return true + default: + return false + } +} + func (sp *StorePool) storeStatus( storeID roachpb.StoreID, nl NodeLivenessFunc, ) (storeStatus, error) { @@ -1238,6 +1255,24 @@ func (sp *StorePool) GetLocalitiesByNode( return localities } +// GetLocalitiesPerReplica computes the localities for the provided replicas. +// It returns a map from the ReplicaDescriptor to the Locality of the Node. +func (sp *StorePool) GetLocalitiesPerReplica( + replicas ...roachpb.ReplicaDescriptor, +) map[roachpb.ReplicaID]roachpb.Locality { + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + localities := make(map[roachpb.ReplicaID]roachpb.Locality) + for _, replica := range replicas { + if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok { + localities[replica.ReplicaID] = locality.locality + } else { + localities[replica.ReplicaID] = roachpb.Locality{} + } + } + return localities +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 84a1fff66b2d..f966f936e7df 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -31,8 +31,9 @@ type unreliableRaftHandlerFuncs struct { dropReq func(*kvserverpb.RaftMessageRequest) bool dropHB func(*kvserverpb.RaftHeartbeat) bool dropResp func(*kvserverpb.RaftMessageResponse) bool - // snapErr defaults to returning nil. - snapErr func(*kvserverpb.SnapshotRequest_Header) error + // snapErr and delegateErr default to returning nil. + snapErr func(*kvserverpb.SnapshotRequest_Header) error + delegateErr func(request *kvserverpb.DelegateSendSnapshotRequest) error } func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { @@ -134,6 +135,20 @@ func (h *unreliableRaftHandler) HandleSnapshot( return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) } +func (h *unreliableRaftHandler) HandleDelegatedSnapshot( + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { + if req.RangeID == h.rangeID && h.delegateErr != nil { + if err := h.delegateErr(req); err != nil { + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } + } + } + return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req) +} + // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. type testClusterStoreRaftMessageHandler struct { @@ -181,15 +196,16 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( } func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { store, err := h.getStore() if err != nil { - return err + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } } - return store.HandleDelegatedSnapshot(ctx, req, stream) + return store.HandleDelegatedSnapshot(ctx, req) } // testClusterPartitionedRange is a convenient abstraction to create a range on a node @@ -198,7 +214,7 @@ type testClusterPartitionedRange struct { rangeID roachpb.RangeID mu struct { syncutil.RWMutex - partitionedNode int + partitionedNodeIdx int partitioned bool partitionedReplicas map[roachpb.ReplicaID]bool } @@ -232,7 +248,7 @@ func setupPartitionedRange( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, funcs unreliableRaftHandlerFuncs, ) (*testClusterPartitionedRange, error) { @@ -243,14 +259,14 @@ func setupPartitionedRange( storeIdx: i, }) } - return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, handlers, funcs) + return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNodeIdx, activated, handlers, funcs) } func setupPartitionedRangeWithHandlers( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, handlers []kvserver.RaftMessageHandler, funcs unreliableRaftHandlerFuncs, @@ -260,9 +276,9 @@ func setupPartitionedRangeWithHandlers( handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)), } pr.mu.partitioned = activated - pr.mu.partitionedNode = partitionedNode + pr.mu.partitionedNodeIdx = partitionedNodeIdx if replicaID == 0 { - ts := tc.Servers[partitionedNode] + ts := tc.Servers[partitionedNodeIdx] store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) if err != nil { return nil, err @@ -294,8 +310,8 @@ func setupPartitionedRangeWithHandlers( pr.mu.RLock() defer pr.mu.RUnlock() return pr.mu.partitioned && - (s == pr.mu.partitionedNode || - req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNode)+1) + (s == pr.mu.partitionedNodeIdx || + req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) } } if h.dropHB == nil { @@ -305,12 +321,21 @@ func setupPartitionedRangeWithHandlers( if !pr.mu.partitioned { return false } - if s == partitionedNode { + if s == partitionedNodeIdx { return true } return pr.mu.partitionedReplicas[hb.FromReplicaID] } } + if h.dropResp == nil { + h.dropResp = func(resp *kvserverpb.RaftMessageResponse) bool { + pr.mu.RLock() + defer pr.mu.RUnlock() + return pr.mu.partitioned && + (s == pr.mu.partitionedNodeIdx || + resp.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) + } + } if h.snapErr == nil { h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error { pr.mu.RLock() @@ -324,6 +349,16 @@ func setupPartitionedRangeWithHandlers( return nil } } + if h.delegateErr == nil { + h.delegateErr = func(resp *kvserverpb.DelegateSendSnapshotRequest) error { + pr.mu.RLock() + defer pr.mu.RUnlock() + if pr.mu.partitionedReplicas[resp.DelegatedSender.ReplicaID] { + return errors.New("partitioned") + } + return nil + } + } pr.handlers = append(pr.handlers, h) tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) } diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 2a92de9b116f..35bbf48f3c61 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3573,10 +3573,8 @@ func (errorChannelTestHandler) HandleSnapshot( } func (errorChannelTestHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + _ context.Context, _ *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { panic("unimplemented") } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 00564d86034a..243c8daf9d70 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -206,7 +206,7 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro // ReservationCount counts the number of outstanding reservations that are not // running. func (s *Store) ReservationCount() int { - return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len() + return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.AvailableLen() } // RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. diff --git a/pkg/kv/kvserver/kvserverpb/raft.go b/pkg/kv/kvserver/kvserverpb/raft.go index 9612ce2e9d45..34785385fc36 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.go +++ b/pkg/kv/kvserver/kvserverpb/raft.go @@ -23,8 +23,11 @@ func (SnapshotRequest_Type) SafeValue() {} // // The bool indicates whether this message uses the deprecated behavior of // encoding an error as a string. -func (m *DelegateSnapshotResponse) Error() (deprecated bool, _ error) { - return m.SnapResponse.Error() +func (m *DelegateSnapshotResponse) Error() error { + if m.Status != DelegateSnapshotResponse_ERROR { + return nil + } + return errors.DecodeError(context.Background(), m.EncodedError) } // Error returns the error contained in the snapshot response, if any. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..8f67047253c4 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -14,7 +14,6 @@ option go_package = "kvserverpb"; import "errorspb/errors.proto"; import "roachpb/errors.proto"; -import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; @@ -221,7 +220,7 @@ message SnapshotRequest { Header header = 1; - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"]; bool final = 4; @@ -256,8 +255,15 @@ message SnapshotResponse { errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; } -// DelegateSnapshotRequest is the request used to delegate send snapshot requests. +// TODO(baptist): Extend this if necessary to separate out the request for the throttle. message DelegateSnapshotRequest { + oneof value { + DelegateSendSnapshotRequest send = 1; + } +} + +// DelegateSnapshotRequest is the request used to delegate send snapshot requests. +message DelegateSendSnapshotRequest { uint64 range_id = 1 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; @@ -272,30 +278,50 @@ message DelegateSnapshotRequest { roachpb.ReplicaDescriptor delegated_sender = 4 [(gogoproto.nullable) = false]; // The priority of the snapshot. + // TODO(abaptist): Remove this field for v23.1. SnapshotRequest.Priority priority = 5; + // The type of the snapshot. + // TODO(abaptist): Remove this field for v23.1. + SnapshotRequest.Type type = 6; + + // The Raft term of the coordinator (in most cases the leaseholder) replica. + // The term is used during snapshot receiving to reject messages from an older term. + uint64 term = 7; + + // The first index of the Raft log on the coordinator replica. + uint64 first_index = 8; + // The sending queue's name. SnapshotRequest.QueueName sender_queue_name = 9; // The sending queue's priority. double sender_queue_priority = 10; - // The type of the snapshot. - SnapshotRequest.Type type = 6; + // The generation of the leaseholders descriptor. + uint64 descriptor_generation = 11 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeGeneration"]; - // The Raft term of the coordinator (in most cases the leaseholder) replica. - // The term is used during snapshot receiving to reject messages from an older term. - uint64 term = 7; + // Max queue length on the delegate before this request is rejected. + int64 queue_on_delegate_len = 12; - // The truncated state of the Raft log on the coordinator replica. - roachpb.RaftTruncatedState truncated_state = 8; + // Id of this snapshot which is maintained from coordinator to receiver. + bytes snap_id = 13 [ + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false]; } message DelegateSnapshotResponse { - SnapshotResponse snapResponse = 1; + enum Status { + ERROR = 0; + APPLIED = 1; + } + + Status status = 1; + errorspb.EncodedError encoded_error = 2 [(gogoproto.nullable) = false]; + // collected_spans stores trace spans recorded during the execution of this // request. - repeated util.tracing.tracingpb.RecordedSpan collected_spans = 2 [(gogoproto.nullable) = false]; + repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false]; } // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. diff --git a/pkg/kv/kvserver/multiqueue/BUILD.bazel b/pkg/kv/kvserver/multiqueue/BUILD.bazel index 5c7a1642b937..14dbae432c80 100644 --- a/pkg/kv/kvserver/multiqueue/BUILD.bazel +++ b/pkg/kv/kvserver/multiqueue/BUILD.bazel @@ -8,14 +8,16 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], ) go_test( name = "multiqueue_test", + size = "small", srcs = ["multi_queue_test.go"], - args = ["-test.timeout=295s"], + args = ["-test.timeout=55s"], embed = [":multiqueue"], deps = [ "//pkg/testutils", diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 588c4556de46..8848612fc727 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -14,6 +14,7 @@ import ( "container/heap" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -145,10 +146,20 @@ func (m *MultiQueue) tryRunNextLocked() { // Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. -func (m *MultiQueue) Add(queueType int, priority float64) *Task { +func (m *MultiQueue) Add(queueType int, priority float64, maxQueueLength int64) (*Task, error) { m.mu.Lock() defer m.mu.Unlock() + // If there are remainingRuns we can run immediately, otherwise compute the + // queue length one we are added. If the queue is too long, return an error + // immediately so the caller doesn't have to wait. + if m.remainingRuns == 0 && maxQueueLength >= 0 { + currentLen := int64(m.queueLenLocked()) + if currentLen > maxQueueLength { + return nil, errors.Newf("queue is too long %d > %d", currentLen, maxQueueLength) + } + } + // The mutex starts locked, unlock it when we are ready to run. pos, ok := m.mapping[queueType] if !ok { @@ -169,7 +180,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() - return &newTask + return &newTask, nil } // Cancel will cancel a Task that may not have started yet. This is useful if it @@ -221,11 +232,32 @@ func (m *MultiQueue) releaseLocked(permit *Permit) { m.tryRunNextLocked() } -// Len returns the number of additional tasks that can be added without -// queueing. This will return 0 if there is anything queued. This method should -// only be used for testing. -func (m *MultiQueue) Len() int { +// AvailableLen returns the number of additional tasks that can be added without +// queueing. This will return 0 if there is anything queued. +func (m *MultiQueue) AvailableLen() int { m.mu.Lock() defer m.mu.Unlock() return m.remainingRuns } + +// QueueLen returns the length of the queue if one more task is added. If this +// returns 0 then a task can be added and run without queueing. +// NB: The value returned is not a guarantee that queueing will not occur when +// the request is submitted. multiple calls to QueueLen could race. +func (m *MultiQueue) QueueLen() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.queueLenLocked() +} + +func (m *MultiQueue) queueLenLocked() int { + if m.remainingRuns > 0 { + return 0 + } + // Start counting from 1 since we will be the first in the queue if it gets added. + count := 1 + for i := 0; i < len(m.outstanding); i++ { + count += len(m.outstanding[i]) + } + return count - m.remainingRuns +} diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index 8e693a1e210f..4c50f4ac2753 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -37,10 +37,10 @@ func TestMultiQueueAddTwiceSameQueue(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - chan1 := queue.Add(7, 1.0) - chan2 := queue.Add(7, 2.0) + chan1, _ := queue.Add(7, 1.0, -1) + chan2, _ := queue.Add(7, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -56,13 +56,13 @@ func TestMultiQueueTwoQueues(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a1 := queue.Add(5, 4.0) - a2 := queue.Add(5, 5.0) + a1, _ := queue.Add(5, 4.0, -1) + a2, _ := queue.Add(5, 5.0, -1) - b1 := queue.Add(6, 1.0) - b2 := queue.Add(6, 2.0) + b1, _ := queue.Add(6, 1.0, -1) + b2, _ := queue.Add(6, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -79,15 +79,15 @@ func TestMultiQueueComplex(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -99,15 +99,15 @@ func TestMultiQueueRemove(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) fmt.Println("Beginning cancel") @@ -125,7 +125,7 @@ func TestMultiQueueCancelOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) queue.Cancel(task) } @@ -139,12 +139,12 @@ func TestMultiQueueCancelInProgress(t *testing.T) { const b = 2 const c = 3 - a3 := queue.Add(a, 5.0) - a2 := queue.Add(a, 4.0) - b1 := queue.Add(b, 1.1) - b2 := queue.Add(b, 2.1) - c3 := queue.Add(c, 2.2) - b3 := queue.Add(b, 6.1) + a3, _ := queue.Add(a, 5.0, -1) + a2, _ := queue.Add(a, 4.0, -1) + b1, _ := queue.Add(b, 1.1, -1) + b2, _ := queue.Add(b, 2.1, -1) + c3, _ := queue.Add(c, 2.2, -1) + b3, _ := queue.Add(b, 6.1, -1) queue.Cancel(b2) queue.Cancel(b1) @@ -237,7 +237,7 @@ func TestMultiQueueStress(t *testing.T) { for i := 0; i < numThreads; i++ { go func(name int) { for j := 0; j < numRequests; j++ { - curTask := queue.Add(name, float64(j)) + curTask, _ := queue.Add(name, float64(j), -1) if alsoCancel && j%99 == 0 { queue.Cancel(curTask) } else { @@ -271,7 +271,7 @@ func TestMultiQueueReleaseTwice(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Release(p) require.Panics(t, func() { queue.Release(p) }) @@ -283,7 +283,7 @@ func TestMultiQueueReleaseAfterCancel(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Cancel(task) queue.Release(p) @@ -294,31 +294,84 @@ func TestMultiQueueLen(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(2) - require.Equal(t, 2, queue.Len()) - - task1 := queue.Add(1, 1) - require.Equal(t, 1, queue.Len()) - task2 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) - task3 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + task1, _ := queue.Add(1, 1, -1) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + task2, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + task3, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) queue.Cancel(task1) // Finish task 1, but immediately start task3. - require.Equal(t, 0, queue.Len()) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) p := <-task2.GetWaitChan() queue.Release(p) - require.Equal(t, 1, queue.Len()) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) queue.Cancel(task3) - require.Equal(t, 2, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) +} + +func TestMultiQueueFull(t *testing.T) { + queue := NewMultiQueue(2) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + // Task 1 starts immediately since there is no queue. + task1, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // Task 2 also starts immediately as the queue supports 2 concurrent. + task2, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 3 would be queued so should not be added. + task3, err := queue.Add(1, 1, 0) + require.Error(t, err) + require.Nil(t, task3) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 4 uses a longer max queue length so should be added. + task4, err := queue.Add(1, 1, 1) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) + + queue.Cancel(task1) + queue.Cancel(task2) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // After these tasks are done, make sure we can add another one. + task5, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + + // Cancel all the remaining tasks. + queue.Cancel(task4) + queue.Cancel(task5) } // verifyOrder makes sure that the chans are called in the specified order. func verifyOrder(t *testing.T, queue *MultiQueue, tasks ...*Task) { // each time, verify that the only available channel is the "next" one in order for i, task := range tasks { + if task == nil { + require.Fail(t, "Task is nil", "%d", task) + } + var found *Permit for j, t2 := range tasks[i+1:] { select { diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go index c4d9d0beb968..db127557bab7 100644 --- a/pkg/kv/kvserver/raft_log_queue_test.go +++ b/pkg/kv/kvserver/raft_log_queue_test.go @@ -645,17 +645,19 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { index2 = 60 ) + r.mu.state.RaftAppliedIndex = index1 // Add first constraint. - r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1, storeID) + _, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID) exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}} // Make sure it registered. assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) + r.mu.state.RaftAppliedIndex = index2 // Add another constraint with the same id. Extremely unlikely in practice // but we want to make sure it doesn't blow anything up. Collisions are // handled by ignoring the colliding update. - r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2, storeID) + _, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID) assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) // Helper that grabs the min constraint index (which can trigger GC as a @@ -672,19 +674,22 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { // colliding update at index2 is not represented. assertMin(index1, time.Time{}) + r.mu.state.RaftAppliedIndex = index2 // Add another, higher, index. We're not going to notice it's around // until the lower one disappears. - r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2, storeID) + _, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID) now := timeutil.Now() // The colliding snapshot comes back. Or the original, we can't tell. - r.completeSnapshotLogTruncationConstraint(id1) + cleanup1() + // This won't do anything since we had a collision, but make sure it's ok. + cleanup2() // The index should show up when its deadline isn't hit. assertMin(index2, now) assertMin(index2, now.Add(1)) assertMin(index2, time.Time{}) - r.completeSnapshotLogTruncationConstraint(id2) + cleanup3() assertMin(0, now) assertMin(0, now.Add(2)) diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 0ba36ab6c487..f8dcdeac6549 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -142,7 +142,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) + err := repl.sendSnapshotUsingDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 283643e551c1..4d299d90b8d0 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -98,13 +98,6 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } -// DelegateSnapshotResponseStream is the subset of the -// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses. -type DelegateSnapshotResponseStream interface { - Send(request *kvserverpb.DelegateSnapshotResponse) error - Recv() (*kvserverpb.DelegateSnapshotRequest, error) -} - // RaftMessageHandler is the interface that must be implemented by // arguments to RaftTransport.Listen. type RaftMessageHandler interface { @@ -132,9 +125,8 @@ type RaftMessageHandler interface { // request. HandleDelegatedSnapshot( ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, - ) error + req *kvserverpb.DelegateSendSnapshotRequest, + ) *kvserverpb.DelegateSnapshotResponse } // RaftTransport handles the rpc messages for raft. @@ -327,14 +319,24 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh if err != nil { return err } - // Check to ensure the header is valid. + resp := t.InternalDelegateRaftSnapshot(ctx, req.GetSend()) + err = stream.Send(resp) + if err != nil { + return err + } + return nil +} + +// InternalDelegateRaftSnapshot processes requests in a request/response fashion for normal DelegateSnapshotRequests +func (t *RaftTransport) InternalDelegateRaftSnapshot( + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { if req == nil { err := errors.New("client error: no message in first delegated snapshot request") - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - }, - ) + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } } // Get the handler of the sender store. handler, ok := t.getHandler(req.DelegatedSender.StoreID) @@ -346,11 +348,15 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh req.CoordinatorReplica.StoreID, req.DelegatedSender.StoreID, ) - return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID) + err := errors.New("unable to accept Raft message: no handler registered for the sender store") + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } } // Pass off the snapshot request to the sender store. - return handler.HandleDelegatedSnapshot(ctx, req, stream) + return handler.HandleDelegatedSnapshot(ctx, req) } // RaftSnapshot handles incoming streaming snapshot requests. @@ -640,10 +646,10 @@ func (t *RaftTransport) SendSnapshot( return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent) } -// DelegateSnapshot creates a rpc stream between the leaseholder and the -// new designated sender for delegated snapshot requests. +// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store +// and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( - ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, ) error { nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) @@ -659,8 +665,41 @@ func (t *RaftTransport) DelegateSnapshot( } defer func() { if err := stream.CloseSend(); err != nil { - log.Warningf(ctx, "failed to close snapshot stream: %+v", err) + log.Warningf(ctx, "failed to close delegate snapshot stream: %+v", err) } }() - return delegateSnapshot(ctx, stream, req) + + // Send the request. + wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}} + if err := stream.Send(wrappedRequest); err != nil { + return errors.Mark(err, errMarkSnapshotError) + } + // Wait for response to see if the receiver successfully applied the snapshot. + resp, err := stream.Recv() + if err != nil { + return errors.Mark( + errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError, + ) + } + + if len(resp.CollectedSpans) != 0 { + span := tracing.SpanFromContext(ctx) + if span == nil { + log.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up") + } else { + span.ImportRemoteRecording(resp.CollectedSpans) + } + } + + switch resp.Status { + case kvserverpb.DelegateSnapshotResponse_ERROR: + return errors.Mark( + errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError) + case kvserverpb.DelegateSnapshotResponse_APPLIED: + // This is the response we're expecting. Snapshot successfully applied. + log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", resp) + return nil + default: + return err + } } diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index 380c741bdfde..4bc40eb8a919 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -97,11 +97,9 @@ func (s channelServer) HandleSnapshot( } func (s channelServer) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { - panic("unimplemented") + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { + panic("unexpected HandleDelegatedSnapshot") } // raftTransportTestContext contains objects needed to test RaftTransport. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 7ef521c1993f..98ba14c2e55f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1693,6 +1693,7 @@ func (r *Replica) initializeRaftLearners( default: log.Fatalf(ctx, "unexpected replicaType %s", replicaType) } + // Lock learner snapshots even before we run the ConfChange txn to add them // to prevent a race with the raft snapshot queue trying to send it first. // @@ -1824,7 +1825,7 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot( + if err := r.sendSnapshotUsingDelegate( ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, ); err != nil { return nil, err @@ -1835,9 +1836,9 @@ func (r *Replica) initializeRaftLearners( // lockLearnerSnapshot stops the raft snapshot queue from sending snapshots to // the soon-to-be added learner replicas to prevent duplicate snapshots from -// being sent. This lock is best effort because it times out and it is a node -// local lock while the raft snapshot queue might be running on a different -// node. An idempotent unlock function is returned. +// being sent. This lock is a node local lock while the raft snapshot queue +// might be running on a different node. An idempotent unlock function is +// returned. func (r *Replica) lockLearnerSnapshot( ctx context.Context, additions []roachpb.ReplicationTarget, ) (unlock func()) { @@ -1845,15 +1846,15 @@ func (r *Replica) lockLearnerSnapshot( // in 19.2 to work around a commit in etcd/raft that made this race more // likely. It'd be nice if all learner snapshots could be sent from a single // place. - var lockUUIDs []uuid.UUID + var cleanups []func() for _, addition := range additions { lockUUID := uuid.MakeV4() - lockUUIDs = append(lockUUIDs, lockUUID) - r.addSnapshotLogTruncationConstraint(ctx, lockUUID, 1, addition.StoreID) + _, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID) + cleanups = append(cleanups, cleanup) } return func() { - for _, lockUUID := range lockUUIDs { - r.completeSnapshotLogTruncationConstraint(lockUUID) + for _, cleanup := range cleanups { + cleanup() } } } @@ -2517,34 +2518,140 @@ func recordRangeEventsInLog( return nil } -// getSenderReplica returns a replica descriptor for a follower replica to act as -// the sender for snapshots. -// TODO(amy): select a follower based on locality matching. -func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) { - log.Fatal(ctx, "follower snapshots not implemented") - return r.GetReplicaDescriptor() +// getSenderReplicas returns an ordered list of replica descriptor for a +// follower replica to act as the sender for delegated snapshots. The replicas +// should be tried in order, and typically the coordinator is the last entry on +// the list. +func (r *Replica) getSenderReplicas( + ctx context.Context, recipient roachpb.ReplicaDescriptor, +) ([]roachpb.ReplicaDescriptor, error) { + + coordinator, err := r.GetReplicaDescriptor() + if err != nil { + // If there is no local replica descriptor, return an empty list. + return nil, err + } + + // Unless all nodes are on V23.1, don't delegate. This prevents sending to a + // node that doesn't understand the request. + if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Check follower snapshots, if zero just self-delegate. + numFollowers := int(NumDelegateLimit.Get(&r.ClusterSettings().SV)) + if numFollowers == 0 { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get range descriptor and store pool. + storePool := r.store.cfg.StorePool + rangeDesc := r.Desc() + + if fn := r.store.cfg.TestingKnobs.SelectDelegateSnapshotSender; fn != nil { + sender := fn(rangeDesc) + // If a TestingKnob is specified use it whatever it is. + if sender != nil { + return sender, nil + } + } + + // Include voter and non-voter replicas on healthy stores as candidates. + nonRecipientReplicas := rangeDesc.Replicas().Filter( + func(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.ReplicaID != recipient.ReplicaID && storePool.IsStoreHealthy(rDesc.StoreID) + }, + ) + candidates := nonRecipientReplicas.VoterAndNonVoterDescriptors() + if len(candidates) == 0 { + // Not clear when the coordinator would be considered dead, but if it does + // happen, just return the coordinator. + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get the localities of the candidate replicas, including the original sender. + localities := storePool.GetLocalitiesPerReplica(candidates...) + recipientLocality := storePool.GetLocalitiesPerReplica(recipient)[recipient.ReplicaID] + + // Construct a map from replica to its diversity score compared to the + // recipient. Also track the best score we see. + replicaDistance := make(map[roachpb.ReplicaID]float64, len(localities)) + closestStore := roachpb.MaxDiversityScore + for desc, locality := range localities { + score := recipientLocality.DiversityScore(locality) + if score < closestStore { + closestStore = score + } + replicaDistance[desc] = score + } + + // Find all replicas that tie as the most optimal sender other than the + // coordinator. The coordinator will always be added to the end of the list + // regardless of score. + var tiedReplicas []roachpb.ReplicaID + for replID, score := range replicaDistance { + if score == closestStore { + // If the coordinator is tied for closest, always use it. + // TODO(baptist): Consider using other replicas at the same distance once + // this is integrated with admission control. + if replID == coordinator.ReplicaID { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + tiedReplicas = append(tiedReplicas, replID) + } + } + + // Use a psuedo random source that is consistent across runs of this method + // for the same coordinator. Shuffle the replicas to prevent always choosing + // them in the same order. + pRand := rand.New(rand.NewSource(int64(coordinator.ReplicaID))) + pRand.Shuffle(len(tiedReplicas), func(i, j int) { tiedReplicas[i], tiedReplicas[j] = tiedReplicas[j], tiedReplicas[i] }) + + // Only keep the top numFollowers replicas. + if len(tiedReplicas) > numFollowers { + tiedReplicas = tiedReplicas[:numFollowers] + } + + // Convert to replica descriptors before returning. The list of tiedReplicas + // is typically only one element. + replicaList := make([]roachpb.ReplicaDescriptor, len(tiedReplicas)+1) + for n, replicaId := range tiedReplicas { + found := false + replDesc, found := rangeDesc.Replicas().GetReplicaDescriptorByID(replicaId) + if !found { + return nil, errors.Errorf("unable to find replica for replicaId %d", replicaId) + } + replicaList[n] = replDesc + } + // Set the last replica to be the coordinator. + replicaList[len(replicaList)-1] = coordinator + return replicaList, nil } -// TODO(amy): update description when patch for follower snapshots are completed. -// sendSnapshot sends a snapshot of the replica state to the specified replica. -// Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful -// about adding additional calls as generating a snapshot is moderately -// expensive. +// sendSnapshotUsingDelegate sends a snapshot of the replica state to the specified +// replica through a delegate. Currently, only invoked from replicateQueue and +// raftSnapshotQueue. Be careful about adding additional calls as generating a +// snapshot is moderately expensive. // // A snapshot is a bulk transfer of all data in a range. It consists of a // consistent view of all the state needed to run some replica of a range as of -// some applied index (not as of some mvcc-time). Snapshots are used by Raft -// when a follower is far enough behind the leader that it can no longer be -// caught up using incremental diffs (because the leader has already garbage -// collected the diffs, in this case because it truncated the Raft log past -// where the follower is). +// some applied index (not as of some mvcc-time). There are two primary cases +// when a Snapshot is used. +// +// The first case is use by Raft when a voter or non-voter follower is far +// enough behind the leader that it can no longer be caught up using incremental +// diffs. This occurs because the leader has already garbage collected diffs +// past where the follower is. The quota pool is responsible for keeping a +// leader from getting too far ahead of any of the followers, so normally +// followers don't need a snapshot, however there are a number of cases where +// this can happen (restarts, paused followers, ...). // -// We also proactively send a snapshot when adding a new replica to bootstrap it +// The second case is adding a new replica to a replica set, to bootstrap it // (this is called a "learner" snapshot and is a special case of a Raft // snapshot, we just speed the process along). It's called a learner snapshot -// because it's sent to what Raft terms a learner replica. As of 19.2, when we -// add a new replica, it's first added as a learner using a Raft ConfChange, -// which means it accepts Raft traffic but doesn't vote or affect quorum. Then +// because it's sent to what Raft terms a learner replica. When we +// add a new replica, it's first added as a learner using a Raft ConfChange. +// A learner accepts Raft traffic but doesn't vote or affect quorum. Then // we immediately send it a snapshot to catch it up. After the snapshot // successfully applies, we turn it into a normal voting replica using another // ConfChange. It then uses the normal mechanisms to catch up with whatever got @@ -2552,13 +2659,20 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // the voting replica directly, this avoids a period of fragility when the // replica would be a full member, but very far behind. // -// Snapshots are expensive and mostly unexpected (except learner snapshots -// during rebalancing). The quota pool is responsible for keeping a leader from -// getting too far ahead of any of the followers, so ideally they'd never be far -// enough behind to need a snapshot. +// The snapshot process itself is broken into 4 parts: delegating the request, +// generating the snapshot, transmitting it, and applying it. // -// The snapshot process itself is broken into 3 parts: generating the snapshot, -// transmitting it, and applying it. +// Delegating the request: Since a snapshot is expensive to transfer from a +// network, CPU and IO perspective, the coordinator attempts to delegate the +// request to a healthy delegate who is both closer to the final destination. +// This is done by sending a DelegateSnapshotRequest to a replica. The replica +// can either reject the delegation request or process it. It will reject if it +// is too far behind, unhealthy, or has too long of a queue of snapshots to +// send. If the delegate accepts the delegation request, then the remaining +// three steps occur on that delegate. If the delegate does not decide to +// process the request, it sends an error back to the coordinator and the +// coordinator either chooses a different delegate or itself as the "delegate of +// last resort". // // Generating the snapshot: The data contained in a snapshot is a full copy of // the replicated data plus everything the replica needs to be a healthy member @@ -2566,7 +2680,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // instead of keeping it all in memory at once. The `(Replica).GetSnapshot` // method does the necessary locking and gathers the various Raft state needed // to run a replica. It also creates an iterator for the range's data as it -// looked under those locks (this is powered by a RocksDB snapshot, which is a +// looked under those locks (this is powered by a Pebble snapshot, which is a // different thing but a similar idea). Notably, GetSnapshot does not do the // data iteration. // @@ -2589,7 +2703,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // returns true, this is communicated back to the sender, which then proceeds to // call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier // to send the data in chunks, each chunk a streaming grpc message. The sender -// then sends a final message with an indicaton that it's done and blocks again, +// then sends a final message with an indication that it's done and blocks again, // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // @@ -2601,7 +2715,6 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // RocksDB. Each of the SSTs also has a range deletion tombstone to delete the // existing data in the range. // - // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to // `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks @@ -2637,7 +2750,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // callers of `shouldAcceptSnapshotData` return an error so that we no longer // have to worry about racing with a second snapshot. See the comment on // ReplicaPlaceholder for details. -func (r *Replica) sendSnapshot( +func (r *Replica) sendSnapshotUsingDelegate( ctx context.Context, recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, @@ -2671,12 +2784,12 @@ func (r *Replica) sendSnapshot( ) } - // Check follower snapshots cluster setting. - if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) { - sender, err = r.getSenderReplica(ctx) - if err != nil { - return err - } + status := r.RaftStatus() + if status == nil { + // This code path is sometimes hit during scatter for replicas that + // haven't woken up yet. + retErr = &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + return } // Don't send a queue name or priority if the receiver may not understand @@ -2688,52 +2801,196 @@ func (r *Replica) sendSnapshot( senderQueueName = 0 senderQueuePriority = 0 } + snapUUID := uuid.MakeV4() + appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID) + // The cleanup function needs to be called regardless of success or failure of + // sending to release the log truncation constraint. + defer cleanup() - log.VEventf( - ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, - ) - status := r.RaftStatus() + // Create new delegate snapshot request without specifying the delegate sender. + delegateRequest := &kvserverpb.DelegateSendSnapshotRequest{ + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, + FirstIndex: appliedIndex, + DescriptorGeneration: r.Desc().Generation, + QueueOnDelegateLen: MaxQueueOnDelegateLimit.Get(&r.ClusterSettings().SV), + SnapId: snapUUID, + } + + // Get the list of senders in order. + senders, err := r.getSenderReplicas(ctx, recipient) + if err != nil { + return err + } + + if len(senders) == 0 { + return errors.Errorf("no sender found to send a snapshot from for %v", r) + } + + for n, sender := range senders { + delegateRequest.DelegatedSender = sender + log.VEventf( + ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n+1, recipient, sender, + ) + + // On the last attempt, always queue on the delegate to time out naturally. + if n == len(senders)-1 { + delegateRequest.QueueOnDelegateLen = -1 + } + + retErr = contextutil.RunWithTimeout( + ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { + // Sending snapshot + return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + }, + ) + // Return once we have success. + if retErr == nil { + return + } else { + log.Warningf(ctx, "attempt %d: delegate snapshot %+v request failed %v", n+1, delegateRequest, retErr) + } + } + return +} + +// validateSnapshotDelegationRequest will validate that this replica can send +// the snapshot that the coordinator requested. The main reasons a request can't +// be delegated are if the Generation or Term of the replica is not equal to the +// Generation or Term of the coordinator's request or the applied index on this +// replica is behind the truncated index of the coordinator. Note that the request +// is validated twice, once before "queueing" and once after. This reduces the +// chance of false positives (snapshots which are sent but can't be used), +// however it is difficult to completely eliminate them. Between the time of sending the +// original request and the delegate processing it, the leaseholder could decide +// to truncate its index, change the leaseholder or split or merge the range. +func (r *Replica) validateSnapshotDelegationRequest( + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) error { + desc := r.Desc() + // If the generation has changed, this snapshot may be useless, so don't + // attempt to send it. + //NB: This is an overly strict check. If other delegates are added to this + //snapshot, we don't necessarily need to reject sending the snapshot, however + //if there are merges or splits, it is safer to reject. + if desc.Generation != req.DescriptorGeneration { + log.VEventf(ctx, 2, + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + return errors.Errorf( + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + } + + // Check that the snapshot we generated has a descriptor that includes the + // recipient. If it doesn't, the recipient will reject it, so it's better to + // not send it in the first place. It's possible to hit this case if we're not + // the leaseholder, and we haven't yet applied the configuration change that's + // adding the recipient to the range, or we are the leaseholder but have + // removed the recipient between starting to send the snapshot and this point. + if _, ok := desc.GetReplicaDescriptorByID(req.RecipientReplica.ReplicaID); !ok { + // Recipient replica not found in the current range descriptor. + // The sender replica's descriptor may be lagging behind the coordinator's. + log.VEventf(ctx, 2, + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.DescriptorGeneration, r.Desc(), + ) + return errors.Errorf( + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.RecipientReplica, r.Desc(), + ) + } + + // Check the raft applied state index and term to determine if this replica + // is not too far behind the leaseholder. If the delegate is too far behind + // that is also needs a snapshot, then any snapshot it sends will be useless. + r.mu.RLock() + replIdx := r.mu.state.RaftAppliedIndex + 1 + + status := r.raftStatusRLocked() if status == nil { // This code path is sometimes hit during scatter for replicas that // haven't woken up yet. - return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} - } - - // Create new delegate snapshot request with only required metadata. - delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - SenderQueueName: senderQueueName, - SenderQueuePriority: senderQueuePriority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, - } - err = contextutil.RunWithTimeout( - ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.DelegateSnapshot( - ctx, - delegateRequest, - ) - }, - ) - // Only mark explicitly as snapshot error (which is retriable) if we timed out. - // Otherwise, it's up to the remote to add this mark where appropriate. - if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { - err = errors.Mark(err, errMarkSnapshotError) + return errors.Errorf("raft status not initialized") + } + replTerm := status.Term + r.mu.RUnlock() + + // Delegate has a different term than the coordinator. This typically means + // the lease has been transferred, and we should not process this request. + // There is a potential race where the leaseholder sends a delegate request + // and then the term changes before this request is processed. In that + // case this code path will not be checked and the snapshot will still be + // sent. + if replTerm != req.Term { + log.Infof( + ctx, + "sender: %v is not fit to send snapshot for %v; sender term: %v coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) + return errors.Errorf( + "sender: %v is not fit to send snapshot for %v; sender term: %v, coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) } - return err + + // Sender replica's snapshot will be rejected if the sender replica's raft + // applied index is lower than or equal to the log truncated constraint on the + // leaseholder, as this replica's snapshot will be wasted. Note that it is + // possible that we can enforce strictly lesser than if etcd does not require + // previous raft log entries for appending. + if replIdx <= req.FirstIndex { + log.Infof( + ctx, "sender: %v is not fit to send snapshot;"+ + " sender first index: %v, "+ + "coordinator log truncation constraint: %v", req.DelegatedSender, replIdx, req.FirstIndex, + ) + return errors.Mark(errors.Errorf( + "sender: %v is not fit to send snapshot;"+ + " sender first index: %v, "+ + "coordinator log truncation constraint: %v", req.DelegatedSender, replIdx, req.FirstIndex, + ), errMarkSnapshotError) + } + return nil } -// followerSnapshotsEnabled is used to enable or disable follower snapshots. -var followerSnapshotsEnabled = func() *settings.BoolSetting { - s := settings.RegisterBoolSetting( +// NumDelegateLimit is used to control the number of delegate followers +// to use for snapshots. To disable follower snapshots, set this to 0. If +// enabled, the leaseholder / leader will attempt to find a closer delegate than +// itself to send the snapshot through. This can save on network bandwidth at a +// cost in some cases to snapshot send latency. +var NumDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( + settings.SystemOnly, + "kv.snapshot_delegation.num_follower", + "the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder", + 1, + ) + s.SetVisibility(settings.Public) + return s +}() + +// MaxQueueOnDelegateLimit is used to control how long the outgoing snapshot +// queue can be before we reject delegation requests. Setting to -1 allows +// unlimited requests. The purpose of this setting is to prevent a long snapshot +// queue from delaying a delegated snapshot from being sent. Once the queue +// length is longer than the configured value, an additional delegation requests +// will be rejected with an error. +var MaxQueueOnDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( settings.SystemOnly, - "kv.snapshot_delegation.enabled", - "set to true to allow snapshots from follower replicas", - false, + "kv.snapshot_delegation.num_requests", + "how many queued requests are allowed on a delegate before the request is rejected", + 3, ) s.SetVisibility(settings.Public) return s @@ -2754,9 +3011,8 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting( func (r *Replica) followerSendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) (retErr error) { + req *kvserverpb.DelegateSendSnapshotRequest, +) error { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -2776,55 +3032,46 @@ func (r *Replica) followerSendSnapshot( }() } - // TODO(amy): when delegating to different senders, check raft applied state - // to determine if this follower replica is fit to send. - // Acknowledge that the request has been accepted. - if err := stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ACCEPTED, - }, - }, - ); err != nil { + // Check the validity conditions twice, once before and once after we obtain + // the send semaphore. We check after to make sure the snapshot request still + // makes sense (e.g the range hasn't split under us). The check is + // lightweight. Even if the second check succeeds, the snapshot we send might + // still not be usable due to the leaseholder making a change that we don't + // know about, but we try to minimize that possibility since snapshots are + // expensive to send. + err := r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { return err } - // Throttle snapshot sending. + // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return err + return errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() + // Check validity again, it is possible that the pending request should not be + // sent after we are doing waiting. + err = r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { + return err + } + snapType := req.Type - snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID) + snap, err := r.GetSnapshot(ctx, snapType, req.SnapId) if err != nil { - err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) - return errors.Mark(err, errMarkSnapshotError) + return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") - // Check that the snapshot we generated has a descriptor that includes the - // recipient. If it doesn't, the recipient will reject it, so it's better to - // not send it in the first place. It's possible to hit this case if we're not - // the leaseholder and we haven't yet applied the configuration change that's - // adding the recipient to the range. - if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - return errors.Wrapf( - errMarkSnapshotError, - "attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc, - ) - } - - // We avoid shipping over the past Raft log in the snapshot by changing - // the truncated state (we're allowed to -- it's an unreplicated key and not - // subject to mapping across replicas). The actual sending happens here: - _ = (*kvBatchSnapshotStrategy)(nil).Send - // and results in no log entries being sent at all. Note that - // Metadata.Index is really the applied index of the replica. + // We avoid shipping over the past Raft log in the snapshot by changing the + // truncated state (we're allowed to -- it's an unreplicated key and not + // subject to mapping across replicas). The actual sending happens in + // kvBatchSnapshotStrategy.Send and results in no log entries being sent at + // all. Note that Metadata.Index is really the applied index of the replica. snap.State.TruncatedState = &roachpb.RaftTruncatedState{ Index: snap.RaftSnap.Metadata.Index, Term: snap.RaftSnap.Metadata.Term, diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index c4844c389032..6b1507833e7a 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -40,8 +41,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -271,6 +274,13 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { ) defer tc.Stopper().Stop(ctx) + + // Disable delegating snapshots to different senders, which would otherwise + // fail this test as snapshots could queue on different stores. + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + kvserver.NumDelegateLimit.Override(ctx, sv, 0) + scratch := tc.ScratchRange(t) replicationChange := make(chan error, 2) g := ctxgroup.WithContext(ctx) @@ -338,6 +348,195 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { require.NoError(t, g.Wait()) } +// TestDelegateSnapshot verifies that the correct delegate is chosen when +// sending snapshots to stores. +// TODO: It is currently disabled because with raft snapshots sometimes being +// required and not going through snapshot delegation, the results are +// unpredictable. +func TestDelegateSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "Occasionally fails until 87553 is resolved") + + ctx := context.Background() + + // Synchronize on the moment before the snapshot gets sent to measure the + // state at that time. + requestChannel := make(chan *kvserverpb.DelegateSendSnapshotRequest, 10) + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.DisableRaftSnapshotQueue = true + + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { + requestChannel <- request + } + + localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}} + localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}} + + localityServerArgs := make(map[int]base.TestServerArgs) + localityServerArgs[0] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[1] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[2] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + localityServerArgs[3] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgsPerNode: localityServerArgs, + ReplicationMode: base.ReplicationManual, + }, + ) + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Node 3 (loc B) can only get the data from node 1 as its the only one that has it. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...) + request := <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) + // Drain the channel. Unfortunately there are occasionally spurious raft snapshots sent. + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 4 (loc B) should get the snapshot from node 3 as its the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(3)) + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 2 (loc A) should get the snapshot from node 1 as it is the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) +} + +// TestDelegateSnapshotFails is a test that ensure we fail fast when the +// sender or receiver store crashes during delegated snapshot sending. +func TestDelegateSnapshotFails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var senders struct { + mu syncutil.Mutex + desc []roachpb.ReplicaDescriptor + } + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + senders.desc = nil + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.ThrottleEmptySnapshots = true + + ltk.storeKnobs.SelectDelegateSnapshotSender = + func(descriptor *roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor { + senders.mu.Lock() + defer senders.mu.Unlock() + return senders.desc + } + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }, + ) + + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + // Add a learner replica that will need a snapshot, kill the server + // the learner is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("receiver", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err, "Unable to lookup the range") + + _, err = setupPartitionedRange(tc, desc.RangeID, 0, 0, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Add a follower replica to act as the snapshot sender, and kill the server + // the sender is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("sender_no_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + // Always use node 3 (index 2) as the only delegate. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + log.Infof(ctx, "Err=%v", err) + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Identical setup as the previous test, but allow a fallback to the leaseholder. + t.Run("sender_with_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + leaseholderDesc, ok := desc.GetReplicaDescriptor(1) + require.True(t, ok) + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.desc = append(senders.desc, leaseholderDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + }) +} + func TestLearnerRaftConfState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -502,7 +701,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // the state at that time & gather metrics. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: @@ -592,7 +791,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // snapshot being sent. <-blockUntilSnapshotSendCh store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, tc.Server(1).GetFirstStoreID()) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE) require.NoError(t, err) close(blockSnapshotSendCh) @@ -1766,9 +1965,8 @@ func getExpectedSnapshotSizeBytes( originStore *kvserver.Store, originRepl *kvserver.Replica, snapType kvserverpb.SnapshotRequest_Type, - recipientStoreID roachpb.StoreID, ) (int64, error) { - snap, err := originRepl.GetSnapshot(ctx, snapType, recipientStoreID) + snap, err := originRepl.GetSnapshot(ctx, snapType, uuid.MakeV4()) if err != nil { return 0, err } @@ -1831,7 +2029,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // the state at that time. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: @@ -1865,7 +2063,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // snapshot being sent. <-blockUntilSnapshotSendCh store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL, tc.Server(1).GetFirstStoreID()) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL) require.NoError(t, err) close(blockSnapshotSendCh) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 5b11de2dcd98..b82292812ec6 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1617,17 +1617,26 @@ type snapTruncationInfo struct { recipientStore roachpb.StoreID } +// addSnapshotLogTruncation creates a log truncation record which will prevent +// the raft log from being truncated past this point until the cleanup function +// is called. This function will return the index that the truncation constraint +// is set at and a cleanup function to remove the constraint. We will fetch the +// applied index again in GetSnapshot and that is likely a different but higher +// index. The appliedIndex fetched here is narrowly used for adding a log +// truncation constraint to prevent log entries > appliedIndex from being +// removed. Note that the appliedIndex maintained in Replica actually lags the +// one in the engine, since replicaAppBatch.ApplyToStateMachine commits the +// engine batch and then acquires mu to update the RaftAppliedIndex. The use of +// a possibly stale value here is harmless since the values increases +// monotonically. The actual snapshot index, may preserve more from a log +// truncation perspective. func (r *Replica) addSnapshotLogTruncationConstraint( - ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID, -) { + ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID, +) (uint64, func()) { r.mu.Lock() defer r.mu.Unlock() - r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, index, recipientStore) -} - -func (r *Replica) addSnapshotLogTruncationConstraintLocked( - ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID, -) { + appliedIndex := r.mu.state.RaftAppliedIndex + // Cleared when OutgoingSnapshot closes. if r.mu.snapshotLogTruncationConstraints == nil { r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo) } @@ -1637,32 +1646,30 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked( // fed into this method twice) or a UUID collision. We discard the update // (which is benign) but log it loudly. If the index is the same, it's // likely the former, otherwise the latter. - log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index) - return + log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, appliedIndex) + return appliedIndex, func() {} } r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{ - index: index, + index: appliedIndex, recipientStore: recipientStore, } -} -// completeSnapshotLogTruncationConstraint marks the given snapshot as finished, -// releasing the lock on raft log truncation after a grace period. -func (r *Replica) completeSnapshotLogTruncationConstraint(snapUUID uuid.UUID) { - r.mu.Lock() - defer r.mu.Unlock() + return appliedIndex, func() { + r.mu.Lock() + defer r.mu.Unlock() - _, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] - if !ok { - // UUID collision while adding the snapshot in originally. Nothing - // else to do. - return - } - delete(r.mu.snapshotLogTruncationConstraints, snapUUID) - if len(r.mu.snapshotLogTruncationConstraints) == 0 { - // Save a little bit of memory. - r.mu.snapshotLogTruncationConstraints = nil + _, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if !ok { + // UUID collision while adding the snapshot in originally. Nothing + // else to do. + return + } + delete(r.mu.snapshotLogTruncationConstraints, snapUUID) + if len(r.mu.snapshotLogTruncationConstraints) == 0 { + // Save a little bit of memory. + r.mu.snapshotLogTruncationConstraints = nil + } } } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ae01168ee7a4..00cc06e19c5e 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -220,44 +220,17 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) { // replica. If this method returns without error, callers must eventually call // OutgoingSnapshot.Close. func (r *Replica) GetSnapshot( - ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, recipientStore roachpb.StoreID, + ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, snapUUID uuid.UUID, ) (_ *OutgoingSnapshot, err error) { - snapUUID := uuid.MakeV4() // Get a snapshot while holding raftMu to make sure we're not seeing "half // an AddSSTable" (i.e. a state in which an SSTable has been linked in, but // the corresponding Raft command not applied yet). r.raftMu.Lock() snap := r.store.engine.NewSnapshot() - { - r.mu.Lock() - // We will fetch the applied index later again, from snap. The - // appliedIndex fetched here is narrowly used for adding a log truncation - // constraint to prevent log entries > appliedIndex from being removed. - // Note that the appliedIndex maintained in Replica actually lags the one - // in the engine, since replicaAppBatch.ApplyToStateMachine commits the - // engine batch and then acquires Replica.mu to update - // Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value - // here is harmless since using a lower index in this constraint, than the - // actual snapshot index, preserves more from a log truncation - // perspective. - // - // TODO(sumeer): despite the above justification, this is unnecessarily - // complicated. Consider loading the RaftAppliedIndex from the snap for - // this use case. - appliedIndex := r.mu.state.RaftAppliedIndex - // Cleared when OutgoingSnapshot closes. - r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) - r.mu.Unlock() - } r.raftMu.Unlock() - release := func() { - r.completeSnapshotLogTruncationConstraint(snapUUID) - } - defer func() { if err != nil { - release() snap.Close() } }() @@ -284,7 +257,6 @@ func (r *Replica) GetSnapshot( log.Errorf(ctx, "error generating snapshot: %+v", err) return nil, err } - snapData.onClose = release return &snapData, nil } diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index a876b768dbeb..63c7dc2dc4ff 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -27,15 +27,11 @@ service MultiRaft { // ERROR, including any collected traces from processing. rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} // DelegateRaftSnapshot asks the server to send a range snapshot to a target - // (so the client delegates the sending of the snapshot to the server). The - // server responds in two phases. - // - // TODO(nvanbenschoten): This RPC is bi-directional streaming (as opposed to - // only server-streaming) because of future aspirations; at the moment the - // request is unary. In the future, we wanted to pause all log truncation, - // then handshake with the delegated sender, then weaken log truncation - // protection to just below the index that the sender was sending the - // snapshot at. + // (so the client delegates the sending of the snapshot to the server). This + // is a "single-shot" stream today that sends a single request and returns a + // single response, however the expectation is that in the future the + // throttling/permit reservation will be separated out from the actual + // sending. rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} } diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 1615a2136ab5..cbd9e99fcce2 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -160,41 +160,41 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) { // HandleDelegatedSnapshot reads the incoming delegated snapshot message and // throttles sending snapshots before passing the request to the sender replica. func (s *Store) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { ctx = s.AnnotateCtx(ctx) if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil { - fn() + fn(req) } + sp := tracing.SpanFromContext(ctx) + + // This can happen if the delegate doesn't know about the range yet. Return an + // error immediately. sender, err := s.GetReplica(req.RangeID) if err != nil { - return err + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + CollectedSpans: sp.GetConfiguredRecording(), + } } - sp := tracing.SpanFromContext(ctx) // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil { - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - CollectedSpans: sp.GetConfiguredRecording(), - }, - ) + if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + // If an error occurred during snapshot sending, send an error response. + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + CollectedSpans: sp.GetConfiguredRecording(), + } } - resp := &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_APPLIED, - DeprecatedMessage: "Snapshot successfully applied by recipient", - }, + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_APPLIED, CollectedSpans: sp.GetConfiguredRecording(), } - // Send a final response that snapshot sending is completed. - return stream.Send(resp) } // HandleSnapshot reads an incoming streaming snapshot and applies it if diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 830faf132846..6768b2efcded 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -91,13 +91,6 @@ type outgoingSnapshotStream interface { Recv() (*kvserverpb.SnapshotResponse, error) } -// outgoingSnapshotStream is the minimal interface on a GRPC stream required -// to send a snapshot over the network. -type outgoingDelegatedStream interface { - Send(*kvserverpb.DelegateSnapshotRequest) error - Recv() (*kvserverpb.DelegateSnapshotResponse, error) -} - // snapshotRecordMetrics is a wrapper function that increments a set of metrics // related to the number of snapshot bytes sent/received. The definer of the // function specifies which metrics are incremented. @@ -674,7 +667,11 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { } } -// reserveReceiveSnapshot throttles incoming snapshots. +// reserveReceiveSnapshot reserves space for this snapshot which will attempt to +// prevent overload of system resources as this snapshot is being sent. +// Snapshots are often sent in bulk (due to operations like store decommission) +// so it is necessary to prevent snapshot transfers from overly impacting +// foreground traffic. func (s *Store) reserveReceiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { @@ -683,8 +680,9 @@ func (s *Store) reserveReceiveSnapshot( return s.throttleSnapshot(ctx, s.snapshotApplyQueue, int(header.SenderQueueName), header.SenderQueuePriority, + -1, header.RangeSize, - header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, + header.RaftMessageRequest.RangeID, s.metrics.RangeSnapshotRecvQueueLength, s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, ) @@ -692,7 +690,7 @@ func (s *Store) reserveReceiveSnapshot( // reserveSendSnapshot throttles outgoing snapshots. func (s *Store) reserveSendSnapshot( - ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, rangeSize int64, + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, rangeSize int64, ) (_cleanup func(), _err error) { ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSendSnapshot") defer sp.Finish() @@ -701,9 +699,11 @@ func (s *Store) reserveSendSnapshot( } return s.throttleSnapshot(ctx, s.snapshotSendQueue, - int(req.SenderQueueName), req.SenderQueuePriority, + int(req.SenderQueueName), + req.SenderQueuePriority, + req.QueueOnDelegateLen, rangeSize, - req.RangeID, req.DelegatedSender.ReplicaID, + req.RangeID, s.metrics.RangeSnapshotSendQueueLength, s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, ) @@ -717,11 +717,12 @@ func (s *Store) throttleSnapshot( snapshotQueue *multiqueue.MultiQueue, requestSource int, requestPriority float64, + maxQueueLength int64, rangeSize int64, rangeID roachpb.RangeID, - replicaID roachpb.ReplicaID, waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, -) (cleanup func(), err error) { +) (cleanup func(), funcErr error) { + tBegin := timeutil.Now() var permit *multiqueue.Permit // Empty snapshots are exempt from rate limits because they're so cheap to @@ -729,9 +730,14 @@ func (s *Store) throttleSnapshot( // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { - task := snapshotQueue.Add(requestSource, requestPriority) + task, err := snapshotQueue.Add(requestSource, requestPriority, maxQueueLength) + if err != nil { + return nil, err + } + // After this point, the task is on the queue, so any future errors need to + // be handled by cancelling the task to release the permit. defer func() { - if err != nil { + if funcErr != nil { snapshotQueue.Cancel(task) } }() @@ -787,10 +793,9 @@ func (s *Store) throttleSnapshot( if elapsed > snapshotReservationWaitWarnThreshold && !buildutil.CrdbTestBuild { log.Infof( ctx, - "waited for %.1fs to acquire snapshot reservation to r%d/%d", + "waited for %.1fs to acquire snapshot reservation to r%d", elapsed.Seconds(), rangeID, - replicaID, ) } @@ -1127,7 +1132,7 @@ func maybeHandleDeprecatedSnapErr(deprecated bool, err error) error { return errors.Mark(err, errMarkSnapshotError) } -// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test. +// SnapshotStorePool narrows StorePool to make sendSnapshotUsingDelegate easier to test. type SnapshotStorePool interface { Throttle(reason storepool.ThrottleReason, why string, toStoreID roachpb.StoreID) } @@ -1501,7 +1506,7 @@ func sendSnapshot( recordBytesSent snapshotRecordMetrics, ) error { if recordBytesSent == nil { - // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot` + // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't // hooked up to anything. recordBytesSent = func(inc int64) {} @@ -1626,76 +1631,3 @@ func sendSnapshot( ) } } - -// delegateSnapshot sends an outgoing delegated snapshot request via a -// pre-opened GRPC stream. It sends the delegated snapshot request to the -// sender and waits for confirmation that the snapshot has been applied. -func delegateSnapshot( - ctx context.Context, stream outgoingDelegatedStream, req *kvserverpb.DelegateSnapshotRequest, -) error { - - delegatedSender := req.DelegatedSender - if err := stream.Send(req); err != nil { - return err - } - // Wait for a response from the sender. - resp, err := stream.Recv() - if err != nil { - return err - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( - maybeHandleDeprecatedSnapErr(resp.Error()), - "%s: sender couldn't accept %s", delegatedSender, req) - case kvserverpb.SnapshotResponse_ACCEPTED: - // The sender accepted the request, it will continue with sending. - log.VEventf( - ctx, 2, "sender %s accepted snapshot request %s", delegatedSender, - req, - ) - default: - err := errors.Errorf( - "%s: server sent an invalid status while negotiating %s: %s", - delegatedSender, req, resp.SnapResponse.Status, - ) - return err - } - - // Wait for response to see if the receiver successfully applied the snapshot. - resp, err = stream.Recv() - if err != nil { - return errors.Mark( - errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError, - ) - } - // Wait for EOF to ensure server side processing is complete. - if unexpectedResp, err := stream.Recv(); err != io.EOF { - if err != nil { - return errors.Mark(errors.Wrapf( - err, "%s: expected EOF, got resp=%v with error", - delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError) - } - return errors.Mark(errors.Newf( - "%s: expected EOF, got resp=%v", delegatedSender.StoreID, - unexpectedResp), errMarkSnapshotError) - } - sp := tracing.SpanFromContext(ctx) - if sp != nil { - sp.ImportRemoteRecording(resp.CollectedSpans) - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return maybeHandleDeprecatedSnapErr(resp.Error()) - case kvserverpb.SnapshotResponse_APPLIED: - // This is the response we're expecting. Snapshot successfully applied. - log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender) - return nil - default: - return errors.Errorf( - "%s: server sent an invalid status during finalization: %s", - delegatedSender, resp.SnapResponse.Status, - ) - } - -} diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 31ce2b1b5071..887730df6f25 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2994,7 +2994,7 @@ func (sp *fakeStorePool) Throttle( } // TestSendSnapshotThrottling tests the store pool throttling behavior of -// store.sendSnapshot, ensuring that it properly updates the StorePool on +// store.sendSnapshotUsingDelegate, ensuring that it properly updates the StorePool on // various exceptional conditions and new capacity estimates. func TestSendSnapshotThrottling(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3065,60 +3065,82 @@ func TestSendSnapshotConcurrency(t *testing.T) { s := tc.store // Checking this now makes sure that if the defaults change this test will also. - require.Equal(t, 2, s.snapshotSendQueue.Len()) - cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) + cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 1, s.snapshotSendQueue.Len()) - cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + require.NoError(t, err) + cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 0, s.snapshotSendQueue.Len()) + require.NoError(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + // At this point both the first two tasks will be holding reservations and - // waiting for cleanup, a third task will block. + // waiting for cleanup, a third task will block or fail First send one with + // the queue length set to 0 - this will fail since the first tasks are still + // running. + _, err = s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + QueueOnDelegateLen: 0, + }, 1) + require.Error(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + + // Now add a task that will wait indefinitely for another task to finish. var wg sync.WaitGroup wg.Add(2) go func() { before := timeutil.Now() - cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) after := timeutil.Now() + require.NoError(t, err) + require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) cleanup3() wg.Done() - require.Nil(t, err) - require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) }() - // This task will not block for more than a few MS, but we want to wait for - // it to complete to make sure it frees the permit. + // Now add one that will queue up and wait for another task to finish. This + // task will not block for more than 8ms, but we want to wait for it to + // complete to make sure it frees the permit. go func() { - deadlineCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + deadlineCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() // This will time out since the deadline is set artificially low. Make sure // the permit is released. - _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSnapshotRequest{ + _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) + require.Error(t, err) wg.Done() - require.NotNil(t, err) }() // Wait a little time before calling signaling the first two as complete. time.Sleep(100 * time.Millisecond) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + // One remaining task are queued at this point. + require.Equal(t, 2, s.snapshotSendQueue.QueueLen()) + cleanup1() cleanup2() // Wait until all cleanup run before checking the number of permits. wg.Wait() - require.Equal(t, 2, s.snapshotSendQueue.Len()) + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) } func TestReserveSnapshotThrottling(t *testing.T) { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index ab998562d9af..e5ebda32f678 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -280,7 +280,7 @@ type StoreTestingKnobs struct { // SendSnapshot is run after receiving a DelegateRaftSnapshot request but // before any throttling or sending logic. - SendSnapshot func() + SendSnapshot func(*kvserverpb.DelegateSendSnapshotRequest) // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. @@ -423,6 +423,9 @@ type StoreTestingKnobs struct { // AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the // send snapshot semaphore. AfterSendSnapshotThrottle func() + // SelectDelegateSnapshotSender returns an ordered list of replica which will + // be used as delegates for sending a snapshot. + SelectDelegateSnapshotSender func(*roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. EnqueueReplicaInterceptor func(queueName string, replica *Replica)