diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 98ab0410d259..b602783c0b0a 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -60,7 +60,8 @@
kv.replica_circuit_breaker.slow_replication_threshold
duration1m0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)
kv.replica_stats.addsst_request_size_factor
integer50000the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval
duration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) -
kv.snapshot_delegation.enabled
booleanfalseset to true to allow snapshots from follower replicas +
kv.snapshot_delegation.num_follower
integer1the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder +
kv.snapshot_delegation.num_requests
integer3how many queued requests are allowed on a delegate before the request is rejected
kv.snapshot_rebalance.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots
kv.snapshot_recovery.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots
kv.transaction.max_intents_bytes
integer4194304maximum number of bytes used to track locks in transactions diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 35c0c030146a..0c7ca78704af 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -61,13 +61,15 @@ const ( type decommissionBenchSpec struct { nodes int - cpus int warehouses int - load bool + noLoad bool multistore bool snapshotRate int duration time.Duration + // Whether the test cluster nodes are in multiple regions. + multiregion bool + // When true, the test will attempt to stop the node prior to decommission. whileDown bool @@ -85,6 +87,10 @@ type decommissionBenchSpec struct { // An override for the default timeout, if needed. timeout time.Duration + // An override for the decommission node to make it choose a predictable node + // instead of a random node. + decommissionNode int + skip string } @@ -95,29 +101,21 @@ func registerDecommissionBench(r registry.Registry) { // Basic benchmark configurations, to be run nightly. { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, duration: 1 * time.Hour, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, whileDown: true, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 4 * time.Hour, @@ -126,9 +124,7 @@ func registerDecommissionBench(r registry.Registry) { { // Add a new node during decommission (no drain). nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -138,9 +134,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, without adding a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -150,9 +144,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, and add a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so @@ -162,25 +154,19 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, drainFirst: true, skip: manualBenchmarkingOnly, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, slowWrites: true, skip: manualBenchmarkingOnly, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, slowWrites: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -189,9 +175,7 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 12, - cpus: 16, warehouses: 3000, - load: true, multistore: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -201,14 +185,30 @@ func registerDecommissionBench(r registry.Registry) { { // Test to compare 12 4-store nodes vs 48 single-store nodes nodes: 48, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 3 * time.Hour, skip: manualBenchmarkingOnly, }, + { + // Multiregion decommission, and add a new node in the same region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 2, + }, + { + // Multiregion decommission, and add a new node in a different region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 3, + }, } { registerDecommissionBenchSpec(r, benchSpec) } @@ -223,7 +223,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe } extraNameParts := []string{""} addlNodeCount := 0 - specOptions := []spec.Option{spec.CPU(benchSpec.cpus)} + specOptions := []spec.Option{spec.CPU(16)} if benchSpec.snapshotRate != 0 { extraNameParts = append(extraNameParts, @@ -251,7 +251,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "while-upreplicating") } - if !benchSpec.load { + if benchSpec.noLoad { extraNameParts = append(extraNameParts, "no-load") } @@ -259,11 +259,23 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "hi-read-amp") } + if benchSpec.decommissionNode != 0 { + extraNameParts = append(extraNameParts, + fmt.Sprintf("target=%d", benchSpec.decommissionNode)) + } + if benchSpec.duration > 0 { timeout = benchSpec.duration * 3 extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration)) } + if benchSpec.multiregion { + geoZones := []string{regionUsEast, regionUsWest, regionUsCentral} + specOptions = append(specOptions, spec.Zones(strings.Join(geoZones, ","))) + specOptions = append(specOptions, spec.Geo()) + extraNameParts = append(extraNameParts, "multi-region") + } + // If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs. noSkipFlag := os.Getenv(envDecommissionNoSkipFlag) if noSkipFlag != "" { @@ -273,8 +285,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraName := strings.Join(extraNameParts, "/") r.Add(registry.TestSpec{ - Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s", - benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName), + Name: fmt.Sprintf("decommissionBench/nodes=%d/warehouses=%d%s", + benchSpec.nodes, benchSpec.warehouses, extraName), Owner: registry.OwnerKV, Cluster: r.MakeClusterSpec( benchSpec.nodes+addlNodeCount+1, @@ -471,7 +483,7 @@ func uploadPerfArtifacts( // Get the workload perf artifacts and move them to the pinned node, so that // they can be used to display the workload operation rates during decommission. - if benchSpec.load { + if !benchSpec.noLoad { workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json") localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json") workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json") @@ -595,7 +607,7 @@ func runDecommissionBench( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -642,7 +654,7 @@ func runDecommissionBench( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -666,7 +678,7 @@ func runDecommissionBench( m.ExpectDeath() defer m.ResetDeaths() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -729,7 +741,7 @@ func runDecommissionBenchLong( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -773,7 +785,7 @@ func runDecommissionBenchLong( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -786,7 +798,7 @@ func runDecommissionBenchLong( for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; { m.ExpectDeath() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -827,12 +839,15 @@ func runSingleDecommission( ctx context.Context, h *decommTestHelper, pinnedNode int, + target int, targetLogicalNodeAtomic *uint32, snapshotRateMb int, stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool, tickByName func(name string), ) error { - target := h.getRandNodeOtherThan(pinnedNode) + if target == 0 { + target = h.getRandNodeOtherThan(pinnedNode) + } targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target) if err != nil { return err diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 55beabc549ff..2f1e60b3413c 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -826,6 +826,23 @@ func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { return status == storeStatusAvailable, nil } +// IsStoreHealthy returns whether we believe this store can serve requests +// reliably. A healthy store can be used for follower snapshot transmission or +// follower reads. A healthy store does not imply that replicas can be moved to +// this store. +func (sp *StorePool) IsStoreHealthy(storeID roachpb.StoreID) bool { + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) + if err != nil { + return false + } + switch status { + case storeStatusAvailable, storeStatusDecommissioning, storeStatusDraining: + return true + default: + return false + } +} + func (sp *StorePool) storeStatus( storeID roachpb.StoreID, nl NodeLivenessFunc, ) (storeStatus, error) { @@ -1238,6 +1255,24 @@ func (sp *StorePool) GetLocalitiesByNode( return localities } +// GetLocalitiesPerReplica computes the localities for the provided replicas. +// It returns a map from the ReplicaDescriptor to the Locality of the Node. +func (sp *StorePool) GetLocalitiesPerReplica( + replicas ...roachpb.ReplicaDescriptor, +) map[roachpb.ReplicaID]roachpb.Locality { + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + localities := make(map[roachpb.ReplicaID]roachpb.Locality) + for _, replica := range replicas { + if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok { + localities[replica.ReplicaID] = locality.locality + } else { + localities[replica.ReplicaID] = roachpb.Locality{} + } + } + return localities +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 84a1fff66b2d..f966f936e7df 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -31,8 +31,9 @@ type unreliableRaftHandlerFuncs struct { dropReq func(*kvserverpb.RaftMessageRequest) bool dropHB func(*kvserverpb.RaftHeartbeat) bool dropResp func(*kvserverpb.RaftMessageResponse) bool - // snapErr defaults to returning nil. - snapErr func(*kvserverpb.SnapshotRequest_Header) error + // snapErr and delegateErr default to returning nil. + snapErr func(*kvserverpb.SnapshotRequest_Header) error + delegateErr func(request *kvserverpb.DelegateSendSnapshotRequest) error } func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { @@ -134,6 +135,20 @@ func (h *unreliableRaftHandler) HandleSnapshot( return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) } +func (h *unreliableRaftHandler) HandleDelegatedSnapshot( + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { + if req.RangeID == h.rangeID && h.delegateErr != nil { + if err := h.delegateErr(req); err != nil { + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } + } + } + return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req) +} + // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. type testClusterStoreRaftMessageHandler struct { @@ -181,15 +196,16 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( } func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { store, err := h.getStore() if err != nil { - return err + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } } - return store.HandleDelegatedSnapshot(ctx, req, stream) + return store.HandleDelegatedSnapshot(ctx, req) } // testClusterPartitionedRange is a convenient abstraction to create a range on a node @@ -198,7 +214,7 @@ type testClusterPartitionedRange struct { rangeID roachpb.RangeID mu struct { syncutil.RWMutex - partitionedNode int + partitionedNodeIdx int partitioned bool partitionedReplicas map[roachpb.ReplicaID]bool } @@ -232,7 +248,7 @@ func setupPartitionedRange( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, funcs unreliableRaftHandlerFuncs, ) (*testClusterPartitionedRange, error) { @@ -243,14 +259,14 @@ func setupPartitionedRange( storeIdx: i, }) } - return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, handlers, funcs) + return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNodeIdx, activated, handlers, funcs) } func setupPartitionedRangeWithHandlers( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, handlers []kvserver.RaftMessageHandler, funcs unreliableRaftHandlerFuncs, @@ -260,9 +276,9 @@ func setupPartitionedRangeWithHandlers( handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)), } pr.mu.partitioned = activated - pr.mu.partitionedNode = partitionedNode + pr.mu.partitionedNodeIdx = partitionedNodeIdx if replicaID == 0 { - ts := tc.Servers[partitionedNode] + ts := tc.Servers[partitionedNodeIdx] store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) if err != nil { return nil, err @@ -294,8 +310,8 @@ func setupPartitionedRangeWithHandlers( pr.mu.RLock() defer pr.mu.RUnlock() return pr.mu.partitioned && - (s == pr.mu.partitionedNode || - req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNode)+1) + (s == pr.mu.partitionedNodeIdx || + req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) } } if h.dropHB == nil { @@ -305,12 +321,21 @@ func setupPartitionedRangeWithHandlers( if !pr.mu.partitioned { return false } - if s == partitionedNode { + if s == partitionedNodeIdx { return true } return pr.mu.partitionedReplicas[hb.FromReplicaID] } } + if h.dropResp == nil { + h.dropResp = func(resp *kvserverpb.RaftMessageResponse) bool { + pr.mu.RLock() + defer pr.mu.RUnlock() + return pr.mu.partitioned && + (s == pr.mu.partitionedNodeIdx || + resp.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) + } + } if h.snapErr == nil { h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error { pr.mu.RLock() @@ -324,6 +349,16 @@ func setupPartitionedRangeWithHandlers( return nil } } + if h.delegateErr == nil { + h.delegateErr = func(resp *kvserverpb.DelegateSendSnapshotRequest) error { + pr.mu.RLock() + defer pr.mu.RUnlock() + if pr.mu.partitionedReplicas[resp.DelegatedSender.ReplicaID] { + return errors.New("partitioned") + } + return nil + } + } pr.handlers = append(pr.handlers, h) tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) } diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 2a92de9b116f..35bbf48f3c61 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3573,10 +3573,8 @@ func (errorChannelTestHandler) HandleSnapshot( } func (errorChannelTestHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + _ context.Context, _ *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { panic("unimplemented") } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 00564d86034a..243c8daf9d70 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -206,7 +206,7 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro // ReservationCount counts the number of outstanding reservations that are not // running. func (s *Store) ReservationCount() int { - return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len() + return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.AvailableLen() } // RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. diff --git a/pkg/kv/kvserver/kvserverpb/raft.go b/pkg/kv/kvserver/kvserverpb/raft.go index 9612ce2e9d45..34785385fc36 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.go +++ b/pkg/kv/kvserver/kvserverpb/raft.go @@ -23,8 +23,11 @@ func (SnapshotRequest_Type) SafeValue() {} // // The bool indicates whether this message uses the deprecated behavior of // encoding an error as a string. -func (m *DelegateSnapshotResponse) Error() (deprecated bool, _ error) { - return m.SnapResponse.Error() +func (m *DelegateSnapshotResponse) Error() error { + if m.Status != DelegateSnapshotResponse_ERROR { + return nil + } + return errors.DecodeError(context.Background(), m.EncodedError) } // Error returns the error contained in the snapshot response, if any. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..8f67047253c4 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -14,7 +14,6 @@ option go_package = "kvserverpb"; import "errorspb/errors.proto"; import "roachpb/errors.proto"; -import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; @@ -221,7 +220,7 @@ message SnapshotRequest { Header header = 1; - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"]; bool final = 4; @@ -256,8 +255,15 @@ message SnapshotResponse { errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; } -// DelegateSnapshotRequest is the request used to delegate send snapshot requests. +// TODO(baptist): Extend this if necessary to separate out the request for the throttle. message DelegateSnapshotRequest { + oneof value { + DelegateSendSnapshotRequest send = 1; + } +} + +// DelegateSnapshotRequest is the request used to delegate send snapshot requests. +message DelegateSendSnapshotRequest { uint64 range_id = 1 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; @@ -272,30 +278,50 @@ message DelegateSnapshotRequest { roachpb.ReplicaDescriptor delegated_sender = 4 [(gogoproto.nullable) = false]; // The priority of the snapshot. + // TODO(abaptist): Remove this field for v23.1. SnapshotRequest.Priority priority = 5; + // The type of the snapshot. + // TODO(abaptist): Remove this field for v23.1. + SnapshotRequest.Type type = 6; + + // The Raft term of the coordinator (in most cases the leaseholder) replica. + // The term is used during snapshot receiving to reject messages from an older term. + uint64 term = 7; + + // The first index of the Raft log on the coordinator replica. + uint64 first_index = 8; + // The sending queue's name. SnapshotRequest.QueueName sender_queue_name = 9; // The sending queue's priority. double sender_queue_priority = 10; - // The type of the snapshot. - SnapshotRequest.Type type = 6; + // The generation of the leaseholders descriptor. + uint64 descriptor_generation = 11 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeGeneration"]; - // The Raft term of the coordinator (in most cases the leaseholder) replica. - // The term is used during snapshot receiving to reject messages from an older term. - uint64 term = 7; + // Max queue length on the delegate before this request is rejected. + int64 queue_on_delegate_len = 12; - // The truncated state of the Raft log on the coordinator replica. - roachpb.RaftTruncatedState truncated_state = 8; + // Id of this snapshot which is maintained from coordinator to receiver. + bytes snap_id = 13 [ + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false]; } message DelegateSnapshotResponse { - SnapshotResponse snapResponse = 1; + enum Status { + ERROR = 0; + APPLIED = 1; + } + + Status status = 1; + errorspb.EncodedError encoded_error = 2 [(gogoproto.nullable) = false]; + // collected_spans stores trace spans recorded during the execution of this // request. - repeated util.tracing.tracingpb.RecordedSpan collected_spans = 2 [(gogoproto.nullable) = false]; + repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false]; } // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 22070dedafb5..d4bec362a464 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -759,6 +759,7 @@ var ( Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", Help: "Number of raft leader transfers", @@ -776,6 +777,40 @@ is located starts following the recovery.`, Unit: metric.Unit_COUNT, } + metaRangeSnapshotSendLatency = metric.Metadata{ + Name: "range.snapshot.send.latency", + Help: `Latency histogram for sending a snapshot from the coordinators perspective. + +This metric measures the time between deciding a snapshot needs to be sent until +it is fully processed and accepted by the end recipient. It includes any +queueing and retries along the way. Note that this snapshot is intentionally +not normalized by snapshot size since frequently the majority of the time spent waiting +is on queues and they are independent of size. +`, + Measurement: "Latency", + Unit: metric.Unit_NANOSECONDS, + } + + metaDelegateSnapshotSuccesses = metric.Metadata{ + Name: "range.snapshot.delegate.successes", + Help: `Number of snapshots that were delegated to a different node and +resulted in success on that delegate. This does not count self delegated snapshots. +`, + Measurement: "Snapshots", + Unit: metric.Unit_COUNT, + } + + metaDelegateSnapshotFailures = metric.Metadata{ + Name: "range.snapshot.delegate.failures", + Help: `Number of snapshots that were delegated to a different node and +resulted in failure on that delegate. There are numerous reasons a failure can +occur on a delegate such as timeout, the delegate Raft log being too far behind +or the delegate being too busy to send. +`, + Measurement: "Snapshots", + Unit: metric.Unit_COUNT, + } + // Quota pool metrics. metaRaftQuotaPoolPercentUsed = metric.Metadata{ Name: "raft.quota_pool.percent_used", @@ -1839,6 +1874,7 @@ type StoreMetrics struct { RangeSnapshotRecoverySentBytes *metric.Counter RangeSnapshotRebalancingRcvdBytes *metric.Counter RangeSnapshotRebalancingSentBytes *metric.Counter + RangeSnapshotSendLatency *metric.Histogram // Range snapshot queue metrics. RangeSnapshotSendQueueLength *metric.Gauge @@ -1848,6 +1884,10 @@ type StoreMetrics struct { RangeSnapshotSendTotalInProgress *metric.Gauge RangeSnapshotRecvTotalInProgress *metric.Gauge + // Delegate snapshot metrics. These don't count self-delegated snapshots. + DelegateSnapshotSuccesses *metric.Counter + DelegateSnapshotFailures *metric.Counter + // Raft processing metrics. RaftTicks *metric.Counter RaftQuotaPoolPercentUsed *metric.Histogram @@ -2367,6 +2407,11 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRecvTotalInProgress: metric.NewGauge(metaRangeSnapshotRecvTotalInProgress), RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries), + RangeSnapshotSendLatency: metric.NewHistogram( + metaRangeSnapshotSendLatency, histogramWindow, metric.BatchProcessLatencyBuckets), + + DelegateSnapshotSuccesses: metric.NewCounter(metaDelegateSnapshotSuccesses), + DelegateSnapshotFailures: metric.NewCounter(metaDelegateSnapshotFailures), // Raft processing metrics. RaftTicks: metric.NewCounter(metaRaftTicks), diff --git a/pkg/kv/kvserver/multiqueue/BUILD.bazel b/pkg/kv/kvserver/multiqueue/BUILD.bazel index 5c7a1642b937..14dbae432c80 100644 --- a/pkg/kv/kvserver/multiqueue/BUILD.bazel +++ b/pkg/kv/kvserver/multiqueue/BUILD.bazel @@ -8,14 +8,16 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], ) go_test( name = "multiqueue_test", + size = "small", srcs = ["multi_queue_test.go"], - args = ["-test.timeout=295s"], + args = ["-test.timeout=55s"], embed = [":multiqueue"], deps = [ "//pkg/testutils", diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 588c4556de46..8848612fc727 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -14,6 +14,7 @@ import ( "container/heap" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -145,10 +146,20 @@ func (m *MultiQueue) tryRunNextLocked() { // Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. -func (m *MultiQueue) Add(queueType int, priority float64) *Task { +func (m *MultiQueue) Add(queueType int, priority float64, maxQueueLength int64) (*Task, error) { m.mu.Lock() defer m.mu.Unlock() + // If there are remainingRuns we can run immediately, otherwise compute the + // queue length one we are added. If the queue is too long, return an error + // immediately so the caller doesn't have to wait. + if m.remainingRuns == 0 && maxQueueLength >= 0 { + currentLen := int64(m.queueLenLocked()) + if currentLen > maxQueueLength { + return nil, errors.Newf("queue is too long %d > %d", currentLen, maxQueueLength) + } + } + // The mutex starts locked, unlock it when we are ready to run. pos, ok := m.mapping[queueType] if !ok { @@ -169,7 +180,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() - return &newTask + return &newTask, nil } // Cancel will cancel a Task that may not have started yet. This is useful if it @@ -221,11 +232,32 @@ func (m *MultiQueue) releaseLocked(permit *Permit) { m.tryRunNextLocked() } -// Len returns the number of additional tasks that can be added without -// queueing. This will return 0 if there is anything queued. This method should -// only be used for testing. -func (m *MultiQueue) Len() int { +// AvailableLen returns the number of additional tasks that can be added without +// queueing. This will return 0 if there is anything queued. +func (m *MultiQueue) AvailableLen() int { m.mu.Lock() defer m.mu.Unlock() return m.remainingRuns } + +// QueueLen returns the length of the queue if one more task is added. If this +// returns 0 then a task can be added and run without queueing. +// NB: The value returned is not a guarantee that queueing will not occur when +// the request is submitted. multiple calls to QueueLen could race. +func (m *MultiQueue) QueueLen() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.queueLenLocked() +} + +func (m *MultiQueue) queueLenLocked() int { + if m.remainingRuns > 0 { + return 0 + } + // Start counting from 1 since we will be the first in the queue if it gets added. + count := 1 + for i := 0; i < len(m.outstanding); i++ { + count += len(m.outstanding[i]) + } + return count - m.remainingRuns +} diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index 8e693a1e210f..4c50f4ac2753 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -37,10 +37,10 @@ func TestMultiQueueAddTwiceSameQueue(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - chan1 := queue.Add(7, 1.0) - chan2 := queue.Add(7, 2.0) + chan1, _ := queue.Add(7, 1.0, -1) + chan2, _ := queue.Add(7, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -56,13 +56,13 @@ func TestMultiQueueTwoQueues(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a1 := queue.Add(5, 4.0) - a2 := queue.Add(5, 5.0) + a1, _ := queue.Add(5, 4.0, -1) + a2, _ := queue.Add(5, 5.0, -1) - b1 := queue.Add(6, 1.0) - b2 := queue.Add(6, 2.0) + b1, _ := queue.Add(6, 1.0, -1) + b2, _ := queue.Add(6, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -79,15 +79,15 @@ func TestMultiQueueComplex(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -99,15 +99,15 @@ func TestMultiQueueRemove(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) fmt.Println("Beginning cancel") @@ -125,7 +125,7 @@ func TestMultiQueueCancelOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) queue.Cancel(task) } @@ -139,12 +139,12 @@ func TestMultiQueueCancelInProgress(t *testing.T) { const b = 2 const c = 3 - a3 := queue.Add(a, 5.0) - a2 := queue.Add(a, 4.0) - b1 := queue.Add(b, 1.1) - b2 := queue.Add(b, 2.1) - c3 := queue.Add(c, 2.2) - b3 := queue.Add(b, 6.1) + a3, _ := queue.Add(a, 5.0, -1) + a2, _ := queue.Add(a, 4.0, -1) + b1, _ := queue.Add(b, 1.1, -1) + b2, _ := queue.Add(b, 2.1, -1) + c3, _ := queue.Add(c, 2.2, -1) + b3, _ := queue.Add(b, 6.1, -1) queue.Cancel(b2) queue.Cancel(b1) @@ -237,7 +237,7 @@ func TestMultiQueueStress(t *testing.T) { for i := 0; i < numThreads; i++ { go func(name int) { for j := 0; j < numRequests; j++ { - curTask := queue.Add(name, float64(j)) + curTask, _ := queue.Add(name, float64(j), -1) if alsoCancel && j%99 == 0 { queue.Cancel(curTask) } else { @@ -271,7 +271,7 @@ func TestMultiQueueReleaseTwice(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Release(p) require.Panics(t, func() { queue.Release(p) }) @@ -283,7 +283,7 @@ func TestMultiQueueReleaseAfterCancel(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Cancel(task) queue.Release(p) @@ -294,31 +294,84 @@ func TestMultiQueueLen(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(2) - require.Equal(t, 2, queue.Len()) - - task1 := queue.Add(1, 1) - require.Equal(t, 1, queue.Len()) - task2 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) - task3 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + task1, _ := queue.Add(1, 1, -1) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + task2, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + task3, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) queue.Cancel(task1) // Finish task 1, but immediately start task3. - require.Equal(t, 0, queue.Len()) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) p := <-task2.GetWaitChan() queue.Release(p) - require.Equal(t, 1, queue.Len()) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) queue.Cancel(task3) - require.Equal(t, 2, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) +} + +func TestMultiQueueFull(t *testing.T) { + queue := NewMultiQueue(2) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + // Task 1 starts immediately since there is no queue. + task1, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // Task 2 also starts immediately as the queue supports 2 concurrent. + task2, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 3 would be queued so should not be added. + task3, err := queue.Add(1, 1, 0) + require.Error(t, err) + require.Nil(t, task3) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 4 uses a longer max queue length so should be added. + task4, err := queue.Add(1, 1, 1) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) + + queue.Cancel(task1) + queue.Cancel(task2) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // After these tasks are done, make sure we can add another one. + task5, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + + // Cancel all the remaining tasks. + queue.Cancel(task4) + queue.Cancel(task5) } // verifyOrder makes sure that the chans are called in the specified order. func verifyOrder(t *testing.T, queue *MultiQueue, tasks ...*Task) { // each time, verify that the only available channel is the "next" one in order for i, task := range tasks { + if task == nil { + require.Fail(t, "Task is nil", "%d", task) + } + var found *Permit for j, t2 := range tasks[i+1:] { select { diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go index c4d9d0beb968..db127557bab7 100644 --- a/pkg/kv/kvserver/raft_log_queue_test.go +++ b/pkg/kv/kvserver/raft_log_queue_test.go @@ -645,17 +645,19 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { index2 = 60 ) + r.mu.state.RaftAppliedIndex = index1 // Add first constraint. - r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1, storeID) + _, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID) exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}} // Make sure it registered. assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) + r.mu.state.RaftAppliedIndex = index2 // Add another constraint with the same id. Extremely unlikely in practice // but we want to make sure it doesn't blow anything up. Collisions are // handled by ignoring the colliding update. - r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2, storeID) + _, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID) assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) // Helper that grabs the min constraint index (which can trigger GC as a @@ -672,19 +674,22 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { // colliding update at index2 is not represented. assertMin(index1, time.Time{}) + r.mu.state.RaftAppliedIndex = index2 // Add another, higher, index. We're not going to notice it's around // until the lower one disappears. - r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2, storeID) + _, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID) now := timeutil.Now() // The colliding snapshot comes back. Or the original, we can't tell. - r.completeSnapshotLogTruncationConstraint(id1) + cleanup1() + // This won't do anything since we had a collision, but make sure it's ok. + cleanup2() // The index should show up when its deadline isn't hit. assertMin(index2, now) assertMin(index2, now.Add(1)) assertMin(index2, time.Time{}) - r.completeSnapshotLogTruncationConstraint(id2) + cleanup3() assertMin(0, now) assertMin(0, now.Add(2)) diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 0ba36ab6c487..f8dcdeac6549 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -142,7 +142,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) + err := repl.sendSnapshotUsingDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 283643e551c1..4d299d90b8d0 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -98,13 +98,6 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } -// DelegateSnapshotResponseStream is the subset of the -// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses. -type DelegateSnapshotResponseStream interface { - Send(request *kvserverpb.DelegateSnapshotResponse) error - Recv() (*kvserverpb.DelegateSnapshotRequest, error) -} - // RaftMessageHandler is the interface that must be implemented by // arguments to RaftTransport.Listen. type RaftMessageHandler interface { @@ -132,9 +125,8 @@ type RaftMessageHandler interface { // request. HandleDelegatedSnapshot( ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, - ) error + req *kvserverpb.DelegateSendSnapshotRequest, + ) *kvserverpb.DelegateSnapshotResponse } // RaftTransport handles the rpc messages for raft. @@ -327,14 +319,24 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh if err != nil { return err } - // Check to ensure the header is valid. + resp := t.InternalDelegateRaftSnapshot(ctx, req.GetSend()) + err = stream.Send(resp) + if err != nil { + return err + } + return nil +} + +// InternalDelegateRaftSnapshot processes requests in a request/response fashion for normal DelegateSnapshotRequests +func (t *RaftTransport) InternalDelegateRaftSnapshot( + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { if req == nil { err := errors.New("client error: no message in first delegated snapshot request") - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - }, - ) + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } } // Get the handler of the sender store. handler, ok := t.getHandler(req.DelegatedSender.StoreID) @@ -346,11 +348,15 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh req.CoordinatorReplica.StoreID, req.DelegatedSender.StoreID, ) - return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID) + err := errors.New("unable to accept Raft message: no handler registered for the sender store") + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + } } // Pass off the snapshot request to the sender store. - return handler.HandleDelegatedSnapshot(ctx, req, stream) + return handler.HandleDelegatedSnapshot(ctx, req) } // RaftSnapshot handles incoming streaming snapshot requests. @@ -640,10 +646,10 @@ func (t *RaftTransport) SendSnapshot( return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent) } -// DelegateSnapshot creates a rpc stream between the leaseholder and the -// new designated sender for delegated snapshot requests. +// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store +// and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( - ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, ) error { nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) @@ -659,8 +665,41 @@ func (t *RaftTransport) DelegateSnapshot( } defer func() { if err := stream.CloseSend(); err != nil { - log.Warningf(ctx, "failed to close snapshot stream: %+v", err) + log.Warningf(ctx, "failed to close delegate snapshot stream: %+v", err) } }() - return delegateSnapshot(ctx, stream, req) + + // Send the request. + wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}} + if err := stream.Send(wrappedRequest); err != nil { + return errors.Mark(err, errMarkSnapshotError) + } + // Wait for response to see if the receiver successfully applied the snapshot. + resp, err := stream.Recv() + if err != nil { + return errors.Mark( + errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError, + ) + } + + if len(resp.CollectedSpans) != 0 { + span := tracing.SpanFromContext(ctx) + if span == nil { + log.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up") + } else { + span.ImportRemoteRecording(resp.CollectedSpans) + } + } + + switch resp.Status { + case kvserverpb.DelegateSnapshotResponse_ERROR: + return errors.Mark( + errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError) + case kvserverpb.DelegateSnapshotResponse_APPLIED: + // This is the response we're expecting. Snapshot successfully applied. + log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", resp) + return nil + default: + return err + } } diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index 380c741bdfde..4bc40eb8a919 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -97,11 +97,9 @@ func (s channelServer) HandleSnapshot( } func (s channelServer) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { - panic("unimplemented") + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { + panic("unexpected HandleDelegatedSnapshot") } // raftTransportTestContext contains objects needed to test RaftTransport. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 7ef521c1993f..0cfa9f42b847 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1693,6 +1693,7 @@ func (r *Replica) initializeRaftLearners( default: log.Fatalf(ctx, "unexpected replicaType %s", replicaType) } + // Lock learner snapshots even before we run the ConfChange txn to add them // to prevent a race with the raft snapshot queue trying to send it first. // @@ -1824,7 +1825,7 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot( + if err := r.sendSnapshotUsingDelegate( ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, ); err != nil { return nil, err @@ -1835,9 +1836,9 @@ func (r *Replica) initializeRaftLearners( // lockLearnerSnapshot stops the raft snapshot queue from sending snapshots to // the soon-to-be added learner replicas to prevent duplicate snapshots from -// being sent. This lock is best effort because it times out and it is a node -// local lock while the raft snapshot queue might be running on a different -// node. An idempotent unlock function is returned. +// being sent. This lock is a node local lock while the raft snapshot queue +// might be running on a different node. An idempotent unlock function is +// returned. func (r *Replica) lockLearnerSnapshot( ctx context.Context, additions []roachpb.ReplicationTarget, ) (unlock func()) { @@ -1845,15 +1846,15 @@ func (r *Replica) lockLearnerSnapshot( // in 19.2 to work around a commit in etcd/raft that made this race more // likely. It'd be nice if all learner snapshots could be sent from a single // place. - var lockUUIDs []uuid.UUID + var cleanups []func() for _, addition := range additions { lockUUID := uuid.MakeV4() - lockUUIDs = append(lockUUIDs, lockUUID) - r.addSnapshotLogTruncationConstraint(ctx, lockUUID, 1, addition.StoreID) + _, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID) + cleanups = append(cleanups, cleanup) } return func() { - for _, lockUUID := range lockUUIDs { - r.completeSnapshotLogTruncationConstraint(lockUUID) + for _, cleanup := range cleanups { + cleanup() } } } @@ -2517,34 +2518,140 @@ func recordRangeEventsInLog( return nil } -// getSenderReplica returns a replica descriptor for a follower replica to act as -// the sender for snapshots. -// TODO(amy): select a follower based on locality matching. -func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) { - log.Fatal(ctx, "follower snapshots not implemented") - return r.GetReplicaDescriptor() +// getSenderReplicas returns an ordered list of replica descriptor for a +// follower replica to act as the sender for delegated snapshots. The replicas +// should be tried in order, and typically the coordinator is the last entry on +// the list. +func (r *Replica) getSenderReplicas( + ctx context.Context, recipient roachpb.ReplicaDescriptor, +) ([]roachpb.ReplicaDescriptor, error) { + + coordinator, err := r.GetReplicaDescriptor() + if err != nil { + // If there is no local replica descriptor, return an empty list. + return nil, err + } + + // Unless all nodes are on V23.1, don't delegate. This prevents sending to a + // node that doesn't understand the request. + if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Check follower snapshots, if zero just self-delegate. + numFollowers := int(NumDelegateLimit.Get(&r.ClusterSettings().SV)) + if numFollowers == 0 { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get range descriptor and store pool. + storePool := r.store.cfg.StorePool + rangeDesc := r.Desc() + + if fn := r.store.cfg.TestingKnobs.SelectDelegateSnapshotSender; fn != nil { + sender := fn(rangeDesc) + // If a TestingKnob is specified use it whatever it is. + if sender != nil { + return sender, nil + } + } + + // Include voter and non-voter replicas on healthy stores as candidates. + nonRecipientReplicas := rangeDesc.Replicas().Filter( + func(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.ReplicaID != recipient.ReplicaID && storePool.IsStoreHealthy(rDesc.StoreID) + }, + ) + candidates := nonRecipientReplicas.VoterAndNonVoterDescriptors() + if len(candidates) == 0 { + // Not clear when the coordinator would be considered dead, but if it does + // happen, just return the coordinator. + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get the localities of the candidate replicas, including the original sender. + localities := storePool.GetLocalitiesPerReplica(candidates...) + recipientLocality := storePool.GetLocalitiesPerReplica(recipient)[recipient.ReplicaID] + + // Construct a map from replica to its diversity score compared to the + // recipient. Also track the best score we see. + replicaDistance := make(map[roachpb.ReplicaID]float64, len(localities)) + closestStore := roachpb.MaxDiversityScore + for desc, locality := range localities { + score := recipientLocality.DiversityScore(locality) + if score < closestStore { + closestStore = score + } + replicaDistance[desc] = score + } + + // Find all replicas that tie as the most optimal sender other than the + // coordinator. The coordinator will always be added to the end of the list + // regardless of score. + var tiedReplicas []roachpb.ReplicaID + for replID, score := range replicaDistance { + if score == closestStore { + // If the coordinator is tied for closest, always use it. + // TODO(baptist): Consider using other replicas at the same distance once + // this is integrated with admission control. + if replID == coordinator.ReplicaID { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + tiedReplicas = append(tiedReplicas, replID) + } + } + + // Use a psuedo random source that is consistent across runs of this method + // for the same coordinator. Shuffle the replicas to prevent always choosing + // them in the same order. + pRand := rand.New(rand.NewSource(int64(coordinator.ReplicaID))) + pRand.Shuffle(len(tiedReplicas), func(i, j int) { tiedReplicas[i], tiedReplicas[j] = tiedReplicas[j], tiedReplicas[i] }) + + // Only keep the top numFollowers replicas. + if len(tiedReplicas) > numFollowers { + tiedReplicas = tiedReplicas[:numFollowers] + } + + // Convert to replica descriptors before returning. The list of tiedReplicas + // is typically only one element. + replicaList := make([]roachpb.ReplicaDescriptor, len(tiedReplicas)+1) + for n, replicaId := range tiedReplicas { + found := false + replDesc, found := rangeDesc.Replicas().GetReplicaDescriptorByID(replicaId) + if !found { + return nil, errors.Errorf("unable to find replica for replicaId %d", replicaId) + } + replicaList[n] = replDesc + } + // Set the last replica to be the coordinator. + replicaList[len(replicaList)-1] = coordinator + return replicaList, nil } -// TODO(amy): update description when patch for follower snapshots are completed. -// sendSnapshot sends a snapshot of the replica state to the specified replica. -// Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful -// about adding additional calls as generating a snapshot is moderately -// expensive. +// sendSnapshotUsingDelegate sends a snapshot of the replica state to the specified +// replica through a delegate. Currently, only invoked from replicateQueue and +// raftSnapshotQueue. Be careful about adding additional calls as generating a +// snapshot is moderately expensive. // // A snapshot is a bulk transfer of all data in a range. It consists of a // consistent view of all the state needed to run some replica of a range as of -// some applied index (not as of some mvcc-time). Snapshots are used by Raft -// when a follower is far enough behind the leader that it can no longer be -// caught up using incremental diffs (because the leader has already garbage -// collected the diffs, in this case because it truncated the Raft log past -// where the follower is). +// some applied index (not as of some mvcc-time). There are two primary cases +// when a Snapshot is used. // -// We also proactively send a snapshot when adding a new replica to bootstrap it +// The first case is use by Raft when a voter or non-voter follower is far +// enough behind the leader that it can no longer be caught up using incremental +// diffs. This occurs because the leader has already garbage collected diffs +// past where the follower is. The quota pool is responsible for keeping a +// leader from getting too far ahead of any of the followers, so normally +// followers don't need a snapshot, however there are a number of cases where +// this can happen (restarts, paused followers, ...). +// +// The second case is adding a new replica to a replica set, to bootstrap it // (this is called a "learner" snapshot and is a special case of a Raft // snapshot, we just speed the process along). It's called a learner snapshot -// because it's sent to what Raft terms a learner replica. As of 19.2, when we -// add a new replica, it's first added as a learner using a Raft ConfChange, -// which means it accepts Raft traffic but doesn't vote or affect quorum. Then +// because it's sent to what Raft terms a learner replica. When we +// add a new replica, it's first added as a learner using a Raft ConfChange. +// A learner accepts Raft traffic but doesn't vote or affect quorum. Then // we immediately send it a snapshot to catch it up. After the snapshot // successfully applies, we turn it into a normal voting replica using another // ConfChange. It then uses the normal mechanisms to catch up with whatever got @@ -2552,13 +2659,20 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // the voting replica directly, this avoids a period of fragility when the // replica would be a full member, but very far behind. // -// Snapshots are expensive and mostly unexpected (except learner snapshots -// during rebalancing). The quota pool is responsible for keeping a leader from -// getting too far ahead of any of the followers, so ideally they'd never be far -// enough behind to need a snapshot. +// The snapshot process itself is broken into 4 parts: delegating the request, +// generating the snapshot, transmitting it, and applying it. // -// The snapshot process itself is broken into 3 parts: generating the snapshot, -// transmitting it, and applying it. +// Delegating the request: Since a snapshot is expensive to transfer from a +// network, CPU and IO perspective, the coordinator attempts to delegate the +// request to a healthy delegate who is both closer to the final destination. +// This is done by sending a DelegateSnapshotRequest to a replica. The replica +// can either reject the delegation request or process it. It will reject if it +// is too far behind, unhealthy, or has too long of a queue of snapshots to +// send. If the delegate accepts the delegation request, then the remaining +// three steps occur on that delegate. If the delegate does not decide to +// process the request, it sends an error back to the coordinator and the +// coordinator either chooses a different delegate or itself as the "delegate of +// last resort". // // Generating the snapshot: The data contained in a snapshot is a full copy of // the replicated data plus everything the replica needs to be a healthy member @@ -2566,7 +2680,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // instead of keeping it all in memory at once. The `(Replica).GetSnapshot` // method does the necessary locking and gathers the various Raft state needed // to run a replica. It also creates an iterator for the range's data as it -// looked under those locks (this is powered by a RocksDB snapshot, which is a +// looked under those locks (this is powered by a Pebble snapshot, which is a // different thing but a similar idea). Notably, GetSnapshot does not do the // data iteration. // @@ -2589,7 +2703,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // returns true, this is communicated back to the sender, which then proceeds to // call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier // to send the data in chunks, each chunk a streaming grpc message. The sender -// then sends a final message with an indicaton that it's done and blocks again, +// then sends a final message with an indication that it's done and blocks again, // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // @@ -2601,7 +2715,6 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // RocksDB. Each of the SSTs also has a range deletion tombstone to delete the // existing data in the range. // - // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to // `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks @@ -2637,7 +2750,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // callers of `shouldAcceptSnapshotData` return an error so that we no longer // have to worry about racing with a second snapshot. See the comment on // ReplicaPlaceholder for details. -func (r *Replica) sendSnapshot( +func (r *Replica) sendSnapshotUsingDelegate( ctx context.Context, recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, @@ -2645,6 +2758,9 @@ func (r *Replica) sendSnapshot( senderQueueName kvserverpb.SnapshotRequest_QueueName, senderQueuePriority float64, ) (retErr error) { + + startTime := timeutil.Now() + defer func() { // Report the snapshot status to Raft, which expects us to do this once we // finish sending the snapshot. @@ -2671,12 +2787,12 @@ func (r *Replica) sendSnapshot( ) } - // Check follower snapshots cluster setting. - if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) { - sender, err = r.getSenderReplica(ctx) - if err != nil { - return err - } + status := r.RaftStatus() + if status == nil { + // This code path is sometimes hit during scatter for replicas that + // haven't woken up yet. + retErr = &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + return } // Don't send a queue name or priority if the receiver may not understand @@ -2688,52 +2804,209 @@ func (r *Replica) sendSnapshot( senderQueueName = 0 senderQueuePriority = 0 } + snapUUID := uuid.MakeV4() + appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID) + // The cleanup function needs to be called regardless of success or failure of + // sending to release the log truncation constraint. + defer cleanup() - log.VEventf( - ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, - ) - status := r.RaftStatus() + // Create new delegate snapshot request without specifying the delegate sender. + // NB: The FirstIndex is the minimum index that the delegate must send a + // snapshot at. The coordinator created a lock at its current applied index, + // and asks the delegate to send a snapshot from that index. Creating the lock + // further back would unnecessarily prevent Raft from GCing old data. + delegateRequest := &kvserverpb.DelegateSendSnapshotRequest{ + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, + FirstIndex: appliedIndex, + DescriptorGeneration: r.Desc().Generation, + QueueOnDelegateLen: MaxQueueOnDelegateLimit.Get(&r.ClusterSettings().SV), + SnapId: snapUUID, + } + + // Get the list of senders in order. + senders, err := r.getSenderReplicas(ctx, recipient) + if err != nil { + return err + } + + if len(senders) == 0 { + return errors.Errorf("no sender found to send a snapshot from for %v", r) + } + + for n, sender := range senders { + delegateRequest.DelegatedSender = sender + log.VEventf( + ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n+1, recipient, sender, + ) + + selfDelegate := n == len(senders)-1 + + // On the last attempt, always queue on the delegate to time out naturally. + if selfDelegate { + delegateRequest.QueueOnDelegateLen = -1 + } + + retErr = contextutil.RunWithTimeout( + ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { + // Sending snapshot + return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + }, + ) + // Return once we have success. + if retErr == nil { + r.store.Metrics().RangeSnapshotSendLatency.RecordValue(timeutil.Since(startTime).Nanoseconds()) + if !selfDelegate { + r.store.Metrics().DelegateSnapshotSuccesses.Inc(1) + } + return + } else { + if !selfDelegate { + r.store.Metrics().DelegateSnapshotFailures.Inc(1) + } + log.Warningf(ctx, "attempt %d: delegate snapshot %+v request failed %v", n+1, delegateRequest, retErr) + } + } + return +} + +// validateSnapshotDelegationRequest will validate that this replica can send +// the snapshot that the coordinator requested. The main reasons a request can't +// be delegated are if the Generation or Term of the replica is not equal to the +// Generation or Term of the coordinator's request or the applied index on this +// replica is behind the truncated index of the coordinator. Note that the request +// is validated twice, once before "queueing" and once after. This reduces the +// chance of false positives (snapshots which are sent but can't be used), +// however it is difficult to completely eliminate them. Between the time of sending the +// original request and the delegate processing it, the leaseholder could decide +// to truncate its index, change the leaseholder or split or merge the range. +func (r *Replica) validateSnapshotDelegationRequest( + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) error { + desc := r.Desc() + // If the generation has changed, this snapshot may be useless, so don't + // attempt to send it. + // NB: This is an overly strict check. If other delegates are added to this + // snapshot, we don't necessarily need to reject sending the snapshot, however + // if there are merges or splits, it is safer to reject. + if desc.Generation != req.DescriptorGeneration { + log.VEventf(ctx, 2, + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + return errors.Errorf( + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + } + + // Check that the snapshot we generated has a descriptor that includes the + // recipient. If it doesn't, the recipient will reject it, so it's better to + // not send it in the first place. It's possible to hit this case if we're not + // the leaseholder, and we haven't yet applied the configuration change that's + // adding the recipient to the range, or we are the leaseholder but have + // removed the recipient between starting to send the snapshot and this point. + if _, ok := desc.GetReplicaDescriptorByID(req.RecipientReplica.ReplicaID); !ok { + // Recipient replica not found in the current range descriptor. + // The sender replica's descriptor may be lagging behind the coordinator's. + log.VEventf(ctx, 2, + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.DescriptorGeneration, r.Desc(), + ) + return errors.Errorf( + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.RecipientReplica, r.Desc(), + ) + } + + // Check the raft applied state index and term to determine if this replica + // is not too far behind the leaseholder. If the delegate is too far behind + // that is also needs a snapshot, then any snapshot it sends will be useless. + r.mu.RLock() + replIdx := r.mu.state.RaftAppliedIndex + 1 + + status := r.raftStatusRLocked() if status == nil { // This code path is sometimes hit during scatter for replicas that // haven't woken up yet. - return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} - } - - // Create new delegate snapshot request with only required metadata. - delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - SenderQueueName: senderQueueName, - SenderQueuePriority: senderQueuePriority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, - } - err = contextutil.RunWithTimeout( - ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.DelegateSnapshot( - ctx, - delegateRequest, - ) - }, - ) - // Only mark explicitly as snapshot error (which is retriable) if we timed out. - // Otherwise, it's up to the remote to add this mark where appropriate. - if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { - err = errors.Mark(err, errMarkSnapshotError) + return errors.Errorf("raft status not initialized") } - return err + replTerm := status.Term + r.mu.RUnlock() + + // Delegate has a different term than the coordinator. This typically means + // the lease has been transferred, and we should not process this request. + // There is a potential race where the leaseholder sends a delegate request + // and then the term changes before this request is processed. In that + // case this code path will not be checked and the snapshot will still be + // sent. + if replTerm != req.Term { + log.Infof( + ctx, + "sender: %v is not fit to send snapshot for %v; sender term: %v coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) + return errors.Errorf( + "sender: %v is not fit to send snapshot for %v; sender term: %v, coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) + } + + // Sender replica's snapshot will be rejected if the sender replica's raft + // applied index is lower than or equal to the log truncated constraint on the + // leaseholder, as this replica's snapshot will be wasted. Note that it is + // possible that we can enforce strictly lesser than if etcd does not require + // previous raft log entries for appending. + if replIdx <= req.FirstIndex { + log.Infof( + ctx, "sender: %v is not fit to send snapshot;"+ + " sender first index: %v, "+ + "coordinator log truncation constraint: %v", req.DelegatedSender, replIdx, req.FirstIndex, + ) + return errors.Mark(errors.Errorf( + "sender: %v is not fit to send snapshot;"+ + " sender first index: %v, "+ + "coordinator log truncation constraint: %v", req.DelegatedSender, replIdx, req.FirstIndex, + ), errMarkSnapshotError) + } + return nil } -// followerSnapshotsEnabled is used to enable or disable follower snapshots. -var followerSnapshotsEnabled = func() *settings.BoolSetting { - s := settings.RegisterBoolSetting( +// NumDelegateLimit is used to control the number of delegate followers +// to use for snapshots. To disable follower snapshots, set this to 0. If +// enabled, the leaseholder / leader will attempt to find a closer delegate than +// itself to send the snapshot through. This can save on network bandwidth at a +// cost in some cases to snapshot send latency. +var NumDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( settings.SystemOnly, - "kv.snapshot_delegation.enabled", - "set to true to allow snapshots from follower replicas", - false, + "kv.snapshot_delegation.num_follower", + "the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder", + 1, + ) + s.SetVisibility(settings.Public) + return s +}() + +// MaxQueueOnDelegateLimit is used to control how long the outgoing snapshot +// queue can be before we reject delegation requests. Setting to -1 allows +// unlimited requests. The purpose of this setting is to prevent a long snapshot +// queue from delaying a delegated snapshot from being sent. Once the queue +// length is longer than the configured value, an additional delegation requests +// will be rejected with an error. +var MaxQueueOnDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( + settings.SystemOnly, + "kv.snapshot_delegation.num_requests", + "how many queued requests are allowed on a delegate before the request is rejected", + 3, ) s.SetVisibility(settings.Public) return s @@ -2754,9 +3027,8 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting( func (r *Replica) followerSendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) (retErr error) { + req *kvserverpb.DelegateSendSnapshotRequest, +) error { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -2776,55 +3048,46 @@ func (r *Replica) followerSendSnapshot( }() } - // TODO(amy): when delegating to different senders, check raft applied state - // to determine if this follower replica is fit to send. - // Acknowledge that the request has been accepted. - if err := stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ACCEPTED, - }, - }, - ); err != nil { + // Check the validity conditions twice, once before and once after we obtain + // the send semaphore. We check after to make sure the snapshot request still + // makes sense (e.g the range hasn't split under us). The check is + // lightweight. Even if the second check succeeds, the snapshot we send might + // still not be usable due to the leaseholder making a change that we don't + // know about, but we try to minimize that possibility since snapshots are + // expensive to send. + err := r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { return err } - // Throttle snapshot sending. + // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return err + return errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() + // Check validity again, it is possible that the pending request should not be + // sent after we are doing waiting. + err = r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { + return err + } + snapType := req.Type - snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID) + snap, err := r.GetSnapshot(ctx, snapType, req.SnapId) if err != nil { - err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) - return errors.Mark(err, errMarkSnapshotError) + return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") - // Check that the snapshot we generated has a descriptor that includes the - // recipient. If it doesn't, the recipient will reject it, so it's better to - // not send it in the first place. It's possible to hit this case if we're not - // the leaseholder and we haven't yet applied the configuration change that's - // adding the recipient to the range. - if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - return errors.Wrapf( - errMarkSnapshotError, - "attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc, - ) - } - - // We avoid shipping over the past Raft log in the snapshot by changing - // the truncated state (we're allowed to -- it's an unreplicated key and not - // subject to mapping across replicas). The actual sending happens here: - _ = (*kvBatchSnapshotStrategy)(nil).Send - // and results in no log entries being sent at all. Note that - // Metadata.Index is really the applied index of the replica. + // We avoid shipping over the past Raft log in the snapshot by changing the + // truncated state (we're allowed to -- it's an unreplicated key and not + // subject to mapping across replicas). The actual sending happens in + // kvBatchSnapshotStrategy.Send and results in no log entries being sent at + // all. Note that Metadata.Index is really the applied index of the replica. snap.State.TruncatedState = &roachpb.RaftTruncatedState{ Index: snap.RaftSnap.Metadata.Index, Term: snap.RaftSnap.Metadata.Term, diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index c4844c389032..6b1507833e7a 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -40,8 +41,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -271,6 +274,13 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { ) defer tc.Stopper().Stop(ctx) + + // Disable delegating snapshots to different senders, which would otherwise + // fail this test as snapshots could queue on different stores. + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + kvserver.NumDelegateLimit.Override(ctx, sv, 0) + scratch := tc.ScratchRange(t) replicationChange := make(chan error, 2) g := ctxgroup.WithContext(ctx) @@ -338,6 +348,195 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { require.NoError(t, g.Wait()) } +// TestDelegateSnapshot verifies that the correct delegate is chosen when +// sending snapshots to stores. +// TODO: It is currently disabled because with raft snapshots sometimes being +// required and not going through snapshot delegation, the results are +// unpredictable. +func TestDelegateSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "Occasionally fails until 87553 is resolved") + + ctx := context.Background() + + // Synchronize on the moment before the snapshot gets sent to measure the + // state at that time. + requestChannel := make(chan *kvserverpb.DelegateSendSnapshotRequest, 10) + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.DisableRaftSnapshotQueue = true + + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { + requestChannel <- request + } + + localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}} + localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}} + + localityServerArgs := make(map[int]base.TestServerArgs) + localityServerArgs[0] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[1] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[2] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + localityServerArgs[3] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgsPerNode: localityServerArgs, + ReplicationMode: base.ReplicationManual, + }, + ) + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Node 3 (loc B) can only get the data from node 1 as its the only one that has it. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...) + request := <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) + // Drain the channel. Unfortunately there are occasionally spurious raft snapshots sent. + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 4 (loc B) should get the snapshot from node 3 as its the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(3)) + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 2 (loc A) should get the snapshot from node 1 as it is the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) +} + +// TestDelegateSnapshotFails is a test that ensure we fail fast when the +// sender or receiver store crashes during delegated snapshot sending. +func TestDelegateSnapshotFails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var senders struct { + mu syncutil.Mutex + desc []roachpb.ReplicaDescriptor + } + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + senders.desc = nil + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.ThrottleEmptySnapshots = true + + ltk.storeKnobs.SelectDelegateSnapshotSender = + func(descriptor *roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor { + senders.mu.Lock() + defer senders.mu.Unlock() + return senders.desc + } + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }, + ) + + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + // Add a learner replica that will need a snapshot, kill the server + // the learner is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("receiver", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err, "Unable to lookup the range") + + _, err = setupPartitionedRange(tc, desc.RangeID, 0, 0, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Add a follower replica to act as the snapshot sender, and kill the server + // the sender is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("sender_no_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + // Always use node 3 (index 2) as the only delegate. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + log.Infof(ctx, "Err=%v", err) + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Identical setup as the previous test, but allow a fallback to the leaseholder. + t.Run("sender_with_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + leaseholderDesc, ok := desc.GetReplicaDescriptor(1) + require.True(t, ok) + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.desc = append(senders.desc, leaseholderDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + }) +} + func TestLearnerRaftConfState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -502,7 +701,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // the state at that time & gather metrics. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: @@ -592,7 +791,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // snapshot being sent. <-blockUntilSnapshotSendCh store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, tc.Server(1).GetFirstStoreID()) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE) require.NoError(t, err) close(blockSnapshotSendCh) @@ -1766,9 +1965,8 @@ func getExpectedSnapshotSizeBytes( originStore *kvserver.Store, originRepl *kvserver.Replica, snapType kvserverpb.SnapshotRequest_Type, - recipientStoreID roachpb.StoreID, ) (int64, error) { - snap, err := originRepl.GetSnapshot(ctx, snapType, recipientStoreID) + snap, err := originRepl.GetSnapshot(ctx, snapType, uuid.MakeV4()) if err != nil { return 0, err } @@ -1831,7 +2029,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // the state at that time. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: @@ -1865,7 +2063,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // snapshot being sent. <-blockUntilSnapshotSendCh store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL, tc.Server(1).GetFirstStoreID()) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL) require.NoError(t, err) close(blockSnapshotSendCh) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 5b11de2dcd98..b82292812ec6 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1617,17 +1617,26 @@ type snapTruncationInfo struct { recipientStore roachpb.StoreID } +// addSnapshotLogTruncation creates a log truncation record which will prevent +// the raft log from being truncated past this point until the cleanup function +// is called. This function will return the index that the truncation constraint +// is set at and a cleanup function to remove the constraint. We will fetch the +// applied index again in GetSnapshot and that is likely a different but higher +// index. The appliedIndex fetched here is narrowly used for adding a log +// truncation constraint to prevent log entries > appliedIndex from being +// removed. Note that the appliedIndex maintained in Replica actually lags the +// one in the engine, since replicaAppBatch.ApplyToStateMachine commits the +// engine batch and then acquires mu to update the RaftAppliedIndex. The use of +// a possibly stale value here is harmless since the values increases +// monotonically. The actual snapshot index, may preserve more from a log +// truncation perspective. func (r *Replica) addSnapshotLogTruncationConstraint( - ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID, -) { + ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID, +) (uint64, func()) { r.mu.Lock() defer r.mu.Unlock() - r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, index, recipientStore) -} - -func (r *Replica) addSnapshotLogTruncationConstraintLocked( - ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID, -) { + appliedIndex := r.mu.state.RaftAppliedIndex + // Cleared when OutgoingSnapshot closes. if r.mu.snapshotLogTruncationConstraints == nil { r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo) } @@ -1637,32 +1646,30 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked( // fed into this method twice) or a UUID collision. We discard the update // (which is benign) but log it loudly. If the index is the same, it's // likely the former, otherwise the latter. - log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index) - return + log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, appliedIndex) + return appliedIndex, func() {} } r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{ - index: index, + index: appliedIndex, recipientStore: recipientStore, } -} -// completeSnapshotLogTruncationConstraint marks the given snapshot as finished, -// releasing the lock on raft log truncation after a grace period. -func (r *Replica) completeSnapshotLogTruncationConstraint(snapUUID uuid.UUID) { - r.mu.Lock() - defer r.mu.Unlock() + return appliedIndex, func() { + r.mu.Lock() + defer r.mu.Unlock() - _, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] - if !ok { - // UUID collision while adding the snapshot in originally. Nothing - // else to do. - return - } - delete(r.mu.snapshotLogTruncationConstraints, snapUUID) - if len(r.mu.snapshotLogTruncationConstraints) == 0 { - // Save a little bit of memory. - r.mu.snapshotLogTruncationConstraints = nil + _, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if !ok { + // UUID collision while adding the snapshot in originally. Nothing + // else to do. + return + } + delete(r.mu.snapshotLogTruncationConstraints, snapUUID) + if len(r.mu.snapshotLogTruncationConstraints) == 0 { + // Save a little bit of memory. + r.mu.snapshotLogTruncationConstraints = nil + } } } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ae01168ee7a4..00cc06e19c5e 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -220,44 +220,17 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) { // replica. If this method returns without error, callers must eventually call // OutgoingSnapshot.Close. func (r *Replica) GetSnapshot( - ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, recipientStore roachpb.StoreID, + ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, snapUUID uuid.UUID, ) (_ *OutgoingSnapshot, err error) { - snapUUID := uuid.MakeV4() // Get a snapshot while holding raftMu to make sure we're not seeing "half // an AddSSTable" (i.e. a state in which an SSTable has been linked in, but // the corresponding Raft command not applied yet). r.raftMu.Lock() snap := r.store.engine.NewSnapshot() - { - r.mu.Lock() - // We will fetch the applied index later again, from snap. The - // appliedIndex fetched here is narrowly used for adding a log truncation - // constraint to prevent log entries > appliedIndex from being removed. - // Note that the appliedIndex maintained in Replica actually lags the one - // in the engine, since replicaAppBatch.ApplyToStateMachine commits the - // engine batch and then acquires Replica.mu to update - // Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value - // here is harmless since using a lower index in this constraint, than the - // actual snapshot index, preserves more from a log truncation - // perspective. - // - // TODO(sumeer): despite the above justification, this is unnecessarily - // complicated. Consider loading the RaftAppliedIndex from the snap for - // this use case. - appliedIndex := r.mu.state.RaftAppliedIndex - // Cleared when OutgoingSnapshot closes. - r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) - r.mu.Unlock() - } r.raftMu.Unlock() - release := func() { - r.completeSnapshotLogTruncationConstraint(snapUUID) - } - defer func() { if err != nil { - release() snap.Close() } }() @@ -284,7 +257,6 @@ func (r *Replica) GetSnapshot( log.Errorf(ctx, "error generating snapshot: %+v", err) return nil, err } - snapData.onClose = release return &snapData, nil } diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index a876b768dbeb..63c7dc2dc4ff 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -27,15 +27,11 @@ service MultiRaft { // ERROR, including any collected traces from processing. rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} // DelegateRaftSnapshot asks the server to send a range snapshot to a target - // (so the client delegates the sending of the snapshot to the server). The - // server responds in two phases. - // - // TODO(nvanbenschoten): This RPC is bi-directional streaming (as opposed to - // only server-streaming) because of future aspirations; at the moment the - // request is unary. In the future, we wanted to pause all log truncation, - // then handshake with the delegated sender, then weaken log truncation - // protection to just below the index that the sender was sending the - // snapshot at. + // (so the client delegates the sending of the snapshot to the server). This + // is a "single-shot" stream today that sends a single request and returns a + // single response, however the expectation is that in the future the + // throttling/permit reservation will be separated out from the actual + // sending. rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} } diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 1615a2136ab5..cbd9e99fcce2 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -160,41 +160,41 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) { // HandleDelegatedSnapshot reads the incoming delegated snapshot message and // throttles sending snapshots before passing the request to the sender replica. func (s *Store) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, +) *kvserverpb.DelegateSnapshotResponse { ctx = s.AnnotateCtx(ctx) if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil { - fn() + fn(req) } + sp := tracing.SpanFromContext(ctx) + + // This can happen if the delegate doesn't know about the range yet. Return an + // error immediately. sender, err := s.GetReplica(req.RangeID) if err != nil { - return err + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + CollectedSpans: sp.GetConfiguredRecording(), + } } - sp := tracing.SpanFromContext(ctx) // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil { - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - CollectedSpans: sp.GetConfiguredRecording(), - }, - ) + if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + // If an error occurred during snapshot sending, send an error response. + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + CollectedSpans: sp.GetConfiguredRecording(), + } } - resp := &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_APPLIED, - DeprecatedMessage: "Snapshot successfully applied by recipient", - }, + return &kvserverpb.DelegateSnapshotResponse{ + Status: kvserverpb.DelegateSnapshotResponse_APPLIED, CollectedSpans: sp.GetConfiguredRecording(), } - // Send a final response that snapshot sending is completed. - return stream.Send(resp) } // HandleSnapshot reads an incoming streaming snapshot and applies it if diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 830faf132846..6768b2efcded 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -91,13 +91,6 @@ type outgoingSnapshotStream interface { Recv() (*kvserverpb.SnapshotResponse, error) } -// outgoingSnapshotStream is the minimal interface on a GRPC stream required -// to send a snapshot over the network. -type outgoingDelegatedStream interface { - Send(*kvserverpb.DelegateSnapshotRequest) error - Recv() (*kvserverpb.DelegateSnapshotResponse, error) -} - // snapshotRecordMetrics is a wrapper function that increments a set of metrics // related to the number of snapshot bytes sent/received. The definer of the // function specifies which metrics are incremented. @@ -674,7 +667,11 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { } } -// reserveReceiveSnapshot throttles incoming snapshots. +// reserveReceiveSnapshot reserves space for this snapshot which will attempt to +// prevent overload of system resources as this snapshot is being sent. +// Snapshots are often sent in bulk (due to operations like store decommission) +// so it is necessary to prevent snapshot transfers from overly impacting +// foreground traffic. func (s *Store) reserveReceiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { @@ -683,8 +680,9 @@ func (s *Store) reserveReceiveSnapshot( return s.throttleSnapshot(ctx, s.snapshotApplyQueue, int(header.SenderQueueName), header.SenderQueuePriority, + -1, header.RangeSize, - header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, + header.RaftMessageRequest.RangeID, s.metrics.RangeSnapshotRecvQueueLength, s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, ) @@ -692,7 +690,7 @@ func (s *Store) reserveReceiveSnapshot( // reserveSendSnapshot throttles outgoing snapshots. func (s *Store) reserveSendSnapshot( - ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, rangeSize int64, + ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, rangeSize int64, ) (_cleanup func(), _err error) { ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSendSnapshot") defer sp.Finish() @@ -701,9 +699,11 @@ func (s *Store) reserveSendSnapshot( } return s.throttleSnapshot(ctx, s.snapshotSendQueue, - int(req.SenderQueueName), req.SenderQueuePriority, + int(req.SenderQueueName), + req.SenderQueuePriority, + req.QueueOnDelegateLen, rangeSize, - req.RangeID, req.DelegatedSender.ReplicaID, + req.RangeID, s.metrics.RangeSnapshotSendQueueLength, s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, ) @@ -717,11 +717,12 @@ func (s *Store) throttleSnapshot( snapshotQueue *multiqueue.MultiQueue, requestSource int, requestPriority float64, + maxQueueLength int64, rangeSize int64, rangeID roachpb.RangeID, - replicaID roachpb.ReplicaID, waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, -) (cleanup func(), err error) { +) (cleanup func(), funcErr error) { + tBegin := timeutil.Now() var permit *multiqueue.Permit // Empty snapshots are exempt from rate limits because they're so cheap to @@ -729,9 +730,14 @@ func (s *Store) throttleSnapshot( // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { - task := snapshotQueue.Add(requestSource, requestPriority) + task, err := snapshotQueue.Add(requestSource, requestPriority, maxQueueLength) + if err != nil { + return nil, err + } + // After this point, the task is on the queue, so any future errors need to + // be handled by cancelling the task to release the permit. defer func() { - if err != nil { + if funcErr != nil { snapshotQueue.Cancel(task) } }() @@ -787,10 +793,9 @@ func (s *Store) throttleSnapshot( if elapsed > snapshotReservationWaitWarnThreshold && !buildutil.CrdbTestBuild { log.Infof( ctx, - "waited for %.1fs to acquire snapshot reservation to r%d/%d", + "waited for %.1fs to acquire snapshot reservation to r%d", elapsed.Seconds(), rangeID, - replicaID, ) } @@ -1127,7 +1132,7 @@ func maybeHandleDeprecatedSnapErr(deprecated bool, err error) error { return errors.Mark(err, errMarkSnapshotError) } -// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test. +// SnapshotStorePool narrows StorePool to make sendSnapshotUsingDelegate easier to test. type SnapshotStorePool interface { Throttle(reason storepool.ThrottleReason, why string, toStoreID roachpb.StoreID) } @@ -1501,7 +1506,7 @@ func sendSnapshot( recordBytesSent snapshotRecordMetrics, ) error { if recordBytesSent == nil { - // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot` + // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't // hooked up to anything. recordBytesSent = func(inc int64) {} @@ -1626,76 +1631,3 @@ func sendSnapshot( ) } } - -// delegateSnapshot sends an outgoing delegated snapshot request via a -// pre-opened GRPC stream. It sends the delegated snapshot request to the -// sender and waits for confirmation that the snapshot has been applied. -func delegateSnapshot( - ctx context.Context, stream outgoingDelegatedStream, req *kvserverpb.DelegateSnapshotRequest, -) error { - - delegatedSender := req.DelegatedSender - if err := stream.Send(req); err != nil { - return err - } - // Wait for a response from the sender. - resp, err := stream.Recv() - if err != nil { - return err - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( - maybeHandleDeprecatedSnapErr(resp.Error()), - "%s: sender couldn't accept %s", delegatedSender, req) - case kvserverpb.SnapshotResponse_ACCEPTED: - // The sender accepted the request, it will continue with sending. - log.VEventf( - ctx, 2, "sender %s accepted snapshot request %s", delegatedSender, - req, - ) - default: - err := errors.Errorf( - "%s: server sent an invalid status while negotiating %s: %s", - delegatedSender, req, resp.SnapResponse.Status, - ) - return err - } - - // Wait for response to see if the receiver successfully applied the snapshot. - resp, err = stream.Recv() - if err != nil { - return errors.Mark( - errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError, - ) - } - // Wait for EOF to ensure server side processing is complete. - if unexpectedResp, err := stream.Recv(); err != io.EOF { - if err != nil { - return errors.Mark(errors.Wrapf( - err, "%s: expected EOF, got resp=%v with error", - delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError) - } - return errors.Mark(errors.Newf( - "%s: expected EOF, got resp=%v", delegatedSender.StoreID, - unexpectedResp), errMarkSnapshotError) - } - sp := tracing.SpanFromContext(ctx) - if sp != nil { - sp.ImportRemoteRecording(resp.CollectedSpans) - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return maybeHandleDeprecatedSnapErr(resp.Error()) - case kvserverpb.SnapshotResponse_APPLIED: - // This is the response we're expecting. Snapshot successfully applied. - log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender) - return nil - default: - return errors.Errorf( - "%s: server sent an invalid status during finalization: %s", - delegatedSender, resp.SnapResponse.Status, - ) - } - -} diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 31ce2b1b5071..887730df6f25 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2994,7 +2994,7 @@ func (sp *fakeStorePool) Throttle( } // TestSendSnapshotThrottling tests the store pool throttling behavior of -// store.sendSnapshot, ensuring that it properly updates the StorePool on +// store.sendSnapshotUsingDelegate, ensuring that it properly updates the StorePool on // various exceptional conditions and new capacity estimates. func TestSendSnapshotThrottling(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3065,60 +3065,82 @@ func TestSendSnapshotConcurrency(t *testing.T) { s := tc.store // Checking this now makes sure that if the defaults change this test will also. - require.Equal(t, 2, s.snapshotSendQueue.Len()) - cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) + cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 1, s.snapshotSendQueue.Len()) - cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + require.NoError(t, err) + cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 0, s.snapshotSendQueue.Len()) + require.NoError(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + // At this point both the first two tasks will be holding reservations and - // waiting for cleanup, a third task will block. + // waiting for cleanup, a third task will block or fail First send one with + // the queue length set to 0 - this will fail since the first tasks are still + // running. + _, err = s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + QueueOnDelegateLen: 0, + }, 1) + require.Error(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + + // Now add a task that will wait indefinitely for another task to finish. var wg sync.WaitGroup wg.Add(2) go func() { before := timeutil.Now() - cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) after := timeutil.Now() + require.NoError(t, err) + require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) cleanup3() wg.Done() - require.Nil(t, err) - require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) }() - // This task will not block for more than a few MS, but we want to wait for - // it to complete to make sure it frees the permit. + // Now add one that will queue up and wait for another task to finish. This + // task will not block for more than 8ms, but we want to wait for it to + // complete to make sure it frees the permit. go func() { - deadlineCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + deadlineCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() // This will time out since the deadline is set artificially low. Make sure // the permit is released. - _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSnapshotRequest{ + _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSendSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) + require.Error(t, err) wg.Done() - require.NotNil(t, err) }() // Wait a little time before calling signaling the first two as complete. time.Sleep(100 * time.Millisecond) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + // One remaining task are queued at this point. + require.Equal(t, 2, s.snapshotSendQueue.QueueLen()) + cleanup1() cleanup2() // Wait until all cleanup run before checking the number of permits. wg.Wait() - require.Equal(t, 2, s.snapshotSendQueue.Len()) + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) } func TestReserveSnapshotThrottling(t *testing.T) { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index ab998562d9af..e5ebda32f678 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -280,7 +280,7 @@ type StoreTestingKnobs struct { // SendSnapshot is run after receiving a DelegateRaftSnapshot request but // before any throttling or sending logic. - SendSnapshot func() + SendSnapshot func(*kvserverpb.DelegateSendSnapshotRequest) // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. @@ -423,6 +423,9 @@ type StoreTestingKnobs struct { // AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the // send snapshot semaphore. AfterSendSnapshotThrottle func() + // SelectDelegateSnapshotSender returns an ordered list of replica which will + // be used as delegates for sending a snapshot. + SelectDelegateSnapshotSender func(*roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. EnqueueReplicaInterceptor func(queueName string, replica *Replica) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 265ca532f0f9..dad9e9e3ecf0 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -616,6 +616,9 @@ var charts = []sectionDescription{ "range.snapshots.applied-voter", "range.snapshots.applied-initial", "range.snapshots.applied-non-voter", + "range.snapshot.delegate.successes", + "range.snapshot.delegate.failures", + "range.snapshot.send.latency", }, }, { diff --git a/pkg/ts/catalog/metrics.go b/pkg/ts/catalog/metrics.go index ce2f8f8bbc02..cd251180fb1e 100644 --- a/pkg/ts/catalog/metrics.go +++ b/pkg/ts/catalog/metrics.go @@ -102,6 +102,7 @@ var histogramMetricsNames = map[string]struct{}{ "replication.flush_hist_nanos": {}, "kv.replica_read_batch_evaluate.latency": {}, "kv.replica_write_batch_evaluate.latency": {}, + "range.snapshot.send.latency": {}, } func allInternalTSMetricsNames() []string {