diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index b13af5c72c19..04d174baa883 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -162,8 +161,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { repl.mu.state.Stats = &enginepb.MVCCStats{} repl.mu.Unlock() - repl.leaseholderStats = replicastats.NewReplicaStats(clock, nil) - repl.writeStats = replicastats.NewReplicaStats(clock, nil) + repl.loadStats = NewReplicaLoad(clock, nil) var rangeUsageInfo allocator.RangeUsageInfo diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 581a772549ce..61fa6a0b8bb5 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2774,9 +2774,14 @@ func TestStoreCapacityAfterSplit(t *testing.T) { if e, a := int32(1), cap.LeaseCount; e != a { t.Errorf("expected cap.LeaseCount=%d, got %d", e, a) } - if minExpected, a := 1/float64(replicastats.MinStatsDuration/time.Second), cap.WritesPerSecond; minExpected > a { + + // NB: The writes per second may be within some error bound below the + // minExpected due to timing and floating point calculation. An error of + // 0.01 (WPS) is added to avoid flaking the test. + if minExpected, a := 1/float64(replicastats.MinStatsDuration/time.Second), cap.WritesPerSecond; minExpected > a+0.01 { t.Errorf("expected cap.WritesPerSecond >= %f, got %f", minExpected, a) } + bpr2 := cap.BytesPerReplica if bpr2.P10 <= bpr1.P10 { t.Errorf("expected BytesPerReplica to have increased from %+v, but got %+v", bpr1, bpr2) diff --git a/pkg/kv/kvserver/deprecated_store_rebalancer.go b/pkg/kv/kvserver/deprecated_store_rebalancer.go index cf2a36bbf9ca..46fad1fb33bd 100644 --- a/pkg/kv/kvserver/deprecated_store_rebalancer.go +++ b/pkg/kv/kvserver/deprecated_store_rebalancer.go @@ -128,7 +128,7 @@ func (sr *StoreRebalancer) deprecatedChooseLeaseToTransfer( *localDesc, candidate.StoreID, candidates, - replWithStats.repl.leaseholderStats, + replWithStats.repl.loadStats.batchRequests, ) { log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping", desc.RangeID, localDesc.StoreID) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index c5b6182d2a6c..2862b5ace9c8 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" @@ -228,24 +227,10 @@ type Replica struct { store *Store abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort - // leaseholderStats tracks all incoming BatchRequests to the replica and which - // localities they come from in order to aid in lease rebalancing decisions. - leaseholderStats *replicastats.ReplicaStats - // writeStats tracks the number of mutations (as counted by the pebble batch - // to be applied to the state machine), and additionally, the number of keys - // added to MVCCStats, which notably may be approximate in the case of an - // AddSSTable. In other words, writeStats should loosely track the write - // activity on the replica on a per-key basis, though in an inconsistent way - // that in particular may overcount by a factor of roughly two. - // - // Note that while writeStats were originally introduced to aid in rebalancing - // decisions in [1], at the time of writing they are not used for that - // purpose. - // - // [1]: https://github.com/cockroachdb/cockroach/pull/16664 - writeStats *replicastats.ReplicaStats - // loadStats tracks a sliding window of throughput on this replica. + // Multiple types of throughput are accounted for. Where the localities of + // requests are tracked in order in addition to the aggregate, in order to + // inform load based lease and replica rebalancing decisions. loadStats *ReplicaLoad // creatingReplica is set when a replica is created as uninitialized diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 063b81d44027..e0e2662bd560 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -662,7 +662,6 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( b.r.store.metrics.AddSSTableApplicationCopies.Inc(1) } if added := res.Delta.KeyCount; added > 0 { - b.r.writeStats.RecordCount(float64(added), 0) b.r.loadStats.writeKeys.RecordCount(float64(added), 0) } if res.AddSSTable.AtWriteTimestamp { @@ -1012,10 +1011,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // Record the write activity, passing a 0 nodeID because replica.writeStats // intentionally doesn't track the origin of the writes. - b.r.writeStats.RecordCount(float64(b.mutations), 0 /* nodeID */) - if b.r.loadStats != nil { - b.r.loadStats.writeKeys.RecordCount(float64(b.mutations), 0) - } + b.r.loadStats.writeKeys.RecordCount(float64(b.mutations), 0) now := timeutil.Now() if needsSplitBySize && r.splitQueueThrottle.ShouldProcess(now) { diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 08cc037374a5..18555e0761b1 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -112,12 +111,8 @@ func newUnloadedReplica( r.leaseHistory = newLeaseHistory() } if store.cfg.StorePool != nil { - r.leaseholderStats = replicastats.NewReplicaStats(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) - r.loadStats = newReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) + r.loadStats = NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) } - // Pass nil for the localityOracle because we intentionally don't track the - // origin locality of write load. - r.writeStats = replicastats.NewReplicaStats(store.Clock(), nil) // Init rangeStr with the range ID. r.rangeStr.store(replicaID, &roachpb.RangeDescriptor{RangeID: desc.RangeID}) diff --git a/pkg/kv/kvserver/replica_load.go b/pkg/kv/kvserver/replica_load.go index 902def761e19..45e979d24c63 100644 --- a/pkg/kv/kvserver/replica_load.go +++ b/pkg/kv/kvserver/replica_load.go @@ -26,7 +26,9 @@ type ReplicaLoad struct { readBytes *replicastats.ReplicaStats } -func newReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad { +// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the +// request throughput of a replica. +func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad { return &ReplicaLoad{ batchRequests: replicastats.NewReplicaStats(clock, getNodeLocality), requests: replicastats.NewReplicaStats(clock, getNodeLocality), diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 64d1e724ebb5..c5a85062f95e 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -282,7 +282,7 @@ func calcBehindCount( // practically none). See Replica.getBatchRequestQPS() for how this is // accounted for. func (r *Replica) QueriesPerSecond() (float64, time.Duration) { - return r.leaseholderStats.AverageRatePerSecond() + return r.loadStats.batchRequests.AverageRatePerSecond() } // RequestsPerSecond returns the range's average requests received per second. @@ -299,7 +299,7 @@ func (r *Replica) RequestsPerSecond() float64 { // writes (12 for the metadata, 12 for the versions). A DeleteRange that // ultimately only removes one key counts as one (or two if it's transactional). func (r *Replica) WritesPerSecond() float64 { - wps, _ := r.writeStats.AverageRatePerSecond() + wps, _ := r.loadStats.writeKeys.AverageRatePerSecond() return wps } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 4c3dd0a3ca28..1dbf38ebbb89 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -322,9 +322,6 @@ func (r *Replica) leasePostApplyLocked( // Reset the request counts used to make lease placement decisions and // load-based splitting/merging decisions whenever starting a new lease. - if r.leaseholderStats != nil { - r.leaseholderStats.ResetRequestCounts() - } if r.loadStats != nil { r.loadStats.reset() } @@ -384,9 +381,6 @@ func (r *Replica) leasePostApplyLocked( } else if prevOwner { r.store.maybeGossipOnCapacityChange(ctx, leaseRemoveEvent) } - if r.leaseholderStats != nil { - r.leaseholderStats.ResetRequestCounts() - } if r.loadStats != nil { r.loadStats.reset() } diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index acae5f42fc0e..93b827be088e 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -162,14 +162,14 @@ func TestAddSSTQPSStat(t *testing.T) { sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor)) // Reset the request counts to 0 before sending to clear previous requests. - repl.leaseholderStats.ResetRequestCounts() + repl.loadStats.reset() _, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba) require.Nil(t, pErr) - repl.leaseholderStats.Mu.Lock() - queriesAfter, _ := repl.leaseholderStats.SumLocked() - repl.leaseholderStats.Mu.Unlock() + repl.loadStats.batchRequests.Mu.Lock() + queriesAfter, _ := repl.loadStats.batchRequests.SumLocked() + repl.loadStats.batchRequests.Mu.Unlock() // If queries are correctly recorded, we should see increase in query // count by the expected QPS. However, it is possible to to get a diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index defa9408359b..c86d6846a4f2 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -137,17 +137,13 @@ func (r *Replica) sendWithoutRangeID( ) (_ *roachpb.BatchResponse, _ *StoreWriteBytes, rErr *roachpb.Error) { var br *roachpb.BatchResponse - if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 { - r.leaseholderStats.RecordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID) - } - - if r.loadStats != nil { - r.loadStats.requests.RecordCount(float64(len(ba.Requests)), 0) - r.loadStats.writeBytes.RecordCount(getBatchRequestWriteBytes(ba), 0) - } // Add the range log tag. ctx = r.AnnotateCtx(ctx) + // Record summary throughput information about the batch request for + // accounting. + r.recordBatchRequestLoad(ctx, ba) + // If the internal Raft group is not initialized, create it and wake the leader. r.maybeInitializeRaftGroup(ctx) @@ -1003,6 +999,29 @@ func (r *Replica) executeAdminBatch( return br, nil } +// recordBatchRequestLoad records the load information about a batch request issued +// against this replica. +func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) { + if r.loadStats == nil { + log.VEventf( + ctx, + 3, + "Unable to record load of batch request for r%d, load stats is not initialized", + ba.Header.RangeID, + ) + return + } + + // adjustedQPS is the adjusted number of queries per second, that is a cost + // estimate of a BatchRequest. See getBatchRequestQPS() for the + // calculation. + adjustedQPS := r.getBatchRequestQPS(ctx, ba) + + r.loadStats.batchRequests.RecordCount(adjustedQPS, ba.Header.GatewayNodeID) + r.loadStats.requests.RecordCount(float64(len(ba.Requests)), ba.Header.GatewayNodeID) + r.loadStats.writeBytes.RecordCount(getBatchRequestWriteBytes(ba), ba.Header.GatewayNodeID) +} + // getBatchRequestQPS calculates the cost estimation of a BatchRequest. The // estimate returns Queries Per Second (QPS), representing the abstract // resource cost associated with this request. BatchRequests are calculated as diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 97f6372dd883..f04d11c76570 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -481,7 +481,7 @@ func (rq *replicateQueue) shouldQueue( status := repl.LeaseStatusAt(ctx, now) if status.IsValid() && rq.canTransferLeaseFrom(ctx, repl) && - rq.allocator.ShouldTransferLease(ctx, conf, voterReplicas, repl, repl.leaseholderStats) { + rq.allocator.ShouldTransferLease(ctx, conf, voterReplicas, repl, repl.loadStats.batchRequests) { log.VEventf(ctx, 2, "lease transfer needed, enqueuing") return true, 0 @@ -1550,7 +1550,7 @@ func (rq *replicateQueue) shedLease( conf, desc.Replicas().VoterDescriptors(), repl, - repl.leaseholderStats, + repl.loadStats.batchRequests, false, /* forceDecisionWithoutStats */ opts, ) @@ -1563,7 +1563,7 @@ func (rq *replicateQueue) shedLease( return allocator.NoTransferDryRun, nil } - avgQPS, qpsMeasurementDur := repl.leaseholderStats.AverageRatePerSecond() + avgQPS, qpsMeasurementDur := repl.loadStats.batchRequests.AverageRatePerSecond() if qpsMeasurementDur < replicastats.MinStatsDuration { avgQPS = 0 } @@ -1724,10 +1724,10 @@ func rangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo { info := allocator.RangeUsageInfo{ LogicalBytes: repl.GetMVCCStats().Total(), } - if queriesPerSecond, dur := repl.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + if queriesPerSecond, dur := repl.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { info.QueriesPerSecond = queriesPerSecond } - if writesPerSecond, dur := repl.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + if writesPerSecond, dur := repl.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { info.WritesPerSecond = writesPerSecond } return info diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b90f08658c6e..d979070a1b66 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2994,12 +2994,12 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // starts? We can't easily have a countdown as its value changes like for // leases/replicas. var qps float64 - if avgQPS, dur := r.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + if avgQPS, dur := r.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { qps = avgQPS totalQueriesPerSecond += avgQPS // TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles? } - if wps, dur := r.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + if wps, dur := r.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { totalWritesPerSecond += wps writesPerReplica = append(writesPerReplica, wps) } @@ -3196,13 +3196,13 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { } pausedFollowerCount += metrics.PausedFollowerCount behindCount += metrics.BehindCount - if qps, dur := rep.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + if qps, dur := rep.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { averageQueriesPerSecond += qps } if rqps, dur := rep.loadStats.requests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { averageRequestsPerSecond += rqps } - if wps, dur := rep.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + if wps, dur := rep.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { averageWritesPerSecond += wps } if rps, dur := rep.loadStats.readKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { diff --git a/pkg/kv/kvserver/store_merge.go b/pkg/kv/kvserver/store_merge.go index 7d0dd5ad805e..9ab21b51cbf5 100644 --- a/pkg/kv/kvserver/store_merge.go +++ b/pkg/kv/kvserver/store_merge.go @@ -145,16 +145,6 @@ func (s *Store) MergeRange( return err } - if leftRepl.leaseholderStats != nil { - leftRepl.leaseholderStats.ResetRequestCounts() - } - if leftRepl.writeStats != nil { - // Note: this could be drastically improved by adding a replicaStats method - // that merges stats. Resetting stats is typically bad for the rebalancing - // logic that depends on them. - leftRepl.writeStats.ResetRequestCounts() - } - leftRepl.loadStats.merge(rightRepl.loadStats) // Clear the concurrency manager's lock and txn wait-queues to redirect the diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index c727e3f1c019..4d84d3de22b9 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -81,17 +81,16 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { ValBytes: 4, } replica.mu.Unlock() - rs := replicastats.NewReplicaStats(clock, nil) + replica.loadStats = NewReplicaLoad(clock, nil) for _, store := range stores { - rs.RecordCount(1, store.Node.NodeID) + replica.loadStats.batchRequests.RecordCount(1, store.Node.NodeID) + replica.loadStats.writeKeys.RecordCount(1, store.Node.NodeID) } manual.Advance(replicastats.MinStatsDuration + time.Second) - replica.leaseholderStats = rs - replica.writeStats = rs rangeUsageInfo := rangeUsageInfoForRepl(&replica) - QPS, _ := replica.leaseholderStats.AverageRatePerSecond() - WPS, _ := replica.writeStats.AverageRatePerSecond() + QPS, _ := replica.loadStats.batchRequests.AverageRatePerSecond() + WPS, _ := replica.loadStats.writeKeys.AverageRatePerSecond() sp.UpdateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeUsageInfo, roachpb.ADD_VOTER) desc, ok := sp.GetStoreDescriptor(roachpb.StoreID(1)) @@ -198,7 +197,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { if err != nil { t.Fatalf("make replica error : %+v", err) } - replica.leaseholderStats = replicastats.NewReplicaStats(store.Clock(), nil) + replica.loadStats = NewReplicaLoad(store.Clock(), nil) rangeUsageInfo := rangeUsageInfoForRepl(replica) diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 18b819a2c266..19e53399f5bc 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -446,7 +446,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( conf, candidates, replWithStats.repl, - replWithStats.repl.leaseholderStats, + replWithStats.repl.loadStats.batchRequests, true, /* forceDecisionWithoutStats */ allocator.TransferLeaseOptions{ Goal: allocator.QPSConvergence, @@ -472,7 +472,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( *localDesc, candidate.StoreID, candidates, - replWithStats.repl.leaseholderStats, + replWithStats.repl.loadStats.batchRequests, ) { log.VEventf( ctx, 3, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead", diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 03028e5e304b..faa8b76e25b5 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -468,10 +467,9 @@ func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { // rangeInfo code is ripped out of the allocator. repl.mu.state.Stats = &enginepb.MVCCStats{} - repl.leaseholderStats = replicastats.NewReplicaStats(s.Clock(), nil) - repl.leaseholderStats.SetMeanRateForTesting(r.qps) + repl.loadStats = NewReplicaLoad(s.Clock(), nil) + repl.loadStats.batchRequests.SetMeanRateForTesting(r.qps) - repl.writeStats = replicastats.NewReplicaStats(s.Clock(), nil) acc.addReplica(replicaWithStats{ repl: repl, qps: r.qps, diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 6ee2cefcfaa5..bb8f57672e38 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -14,7 +14,6 @@ import ( "bytes" "context" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -324,16 +323,17 @@ func (s *Store) SplitRange( // clear them. leftRepl.concMgr.OnRangeSplit() - // Clear the original range's request stats, since they include requests for - // spans that are now owned by the new range. - leftRepl.leaseholderStats.ResetRequestCounts() - if rightReplOrNil == nil { - throwawayRightWriteStats := new(replicastats.ReplicaStats) - leftRepl.writeStats.SplitRequestCounts(throwawayRightWriteStats) + // There is no rhs replica, so instead halve the load of the lhs + // replica. + throwawayRightStats := NewReplicaLoad(s.Clock(), nil) + leftRepl.loadStats.split(throwawayRightStats) } else { rightRepl := rightReplOrNil - leftRepl.writeStats.SplitRequestCounts(rightRepl.writeStats) + // Split the replica load of the lhs evenly (50:50) with the rhs. NB: + // that this ignores the split point and makes as simplifying + // assumption that distribution across all tracked load stats is + // identical. leftRepl.loadStats.split(rightRepl.loadStats) if err := s.addReplicaInternalLocked(rightRepl); err != nil { return errors.Wrapf(err, "unable to add replica %v", rightRepl)