Skip to content

Commit

Permalink
kvserver: record batch requests with no gateway
Browse files Browse the repository at this point in the history
Previously, batch requests with no `GatewayNodeID` would not be
accounted for on the QPS of a replica. By extension, the store QPS would
also not aggregate this missing QPS over replicas it holds. This patch
introduces tracking for all requests, regardless of the `GatewayNodeID`.

This was done to as follow the workload lease transfers consider the
per-locality counts, therefore untagged localities were not useful. This
has since been updated to ignore filter out localities directly, so it
is not necessary to exclude them anymore.

`leaseholderStats`, which previously tracked the QPS, and `writeStats`
tracking the mvcc keys written, have also been removed. They are
duplicated in `batchRequest` and `writeKeys` respectively, within the
`loadStats` of a replica.

resolves cockroachdb#85157

Release note: None
  • Loading branch information
kvoli committed Jul 28, 2022
1 parent 9b5a2b7 commit 04b43de
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 95 deletions.
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
Expand Down Expand Up @@ -162,8 +161,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
repl.mu.state.Stats = &enginepb.MVCCStats{}
repl.mu.Unlock()

repl.leaseholderStats = replicastats.NewReplicaStats(clock, nil)
repl.writeStats = replicastats.NewReplicaStats(clock, nil)
repl.loadStats = NewReplicaLoad(clock, nil)

var rangeUsageInfo allocator.RangeUsageInfo

Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2774,9 +2774,14 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
if e, a := int32(1), cap.LeaseCount; e != a {
t.Errorf("expected cap.LeaseCount=%d, got %d", e, a)
}
if minExpected, a := 1/float64(replicastats.MinStatsDuration/time.Second), cap.WritesPerSecond; minExpected > a {

// NB: The writes per second may be within some error bound below the
// minExpected due to timing and floating point calculation. An error of
// 0.01 (WPS) is added to avoid flaking the test.
if minExpected, a := 1/float64(replicastats.MinStatsDuration/time.Second), cap.WritesPerSecond; minExpected > a+0.01 {
t.Errorf("expected cap.WritesPerSecond >= %f, got %f", minExpected, a)
}

bpr2 := cap.BytesPerReplica
if bpr2.P10 <= bpr1.P10 {
t.Errorf("expected BytesPerReplica to have increased from %+v, but got %+v", bpr1, bpr2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/deprecated_store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (sr *StoreRebalancer) deprecatedChooseLeaseToTransfer(
*localDesc,
candidate.StoreID,
candidates,
replWithStats.repl.leaseholderStats,
replWithStats.repl.loadStats.batchRequests,
) {
log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping",
desc.RangeID, localDesc.StoreID)
Expand Down
21 changes: 3 additions & 18 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
Expand Down Expand Up @@ -228,24 +227,10 @@ type Replica struct {
store *Store
abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort

// leaseholderStats tracks all incoming BatchRequests to the replica and which
// localities they come from in order to aid in lease rebalancing decisions.
leaseholderStats *replicastats.ReplicaStats
// writeStats tracks the number of mutations (as counted by the pebble batch
// to be applied to the state machine), and additionally, the number of keys
// added to MVCCStats, which notably may be approximate in the case of an
// AddSSTable. In other words, writeStats should loosely track the write
// activity on the replica on a per-key basis, though in an inconsistent way
// that in particular may overcount by a factor of roughly two.
//
// Note that while writeStats were originally introduced to aid in rebalancing
// decisions in [1], at the time of writing they are not used for that
// purpose.
//
// [1]: https://github.com/cockroachdb/cockroach/pull/16664
writeStats *replicastats.ReplicaStats

// loadStats tracks a sliding window of throughput on this replica.
// Multiple types of throughput are accounted for. Where the localities of
// requests are tracked in order in addition to the aggregate, in order to
// inform load based lease and replica rebalancing decisions.
loadStats *ReplicaLoad

// creatingReplica is set when a replica is created as uninitialized
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
b.r.store.metrics.AddSSTableApplicationCopies.Inc(1)
}
if added := res.Delta.KeyCount; added > 0 {
b.r.writeStats.RecordCount(float64(added), 0)
b.r.loadStats.writeKeys.RecordCount(float64(added), 0)
}
if res.AddSSTable.AtWriteTimestamp {
Expand Down Expand Up @@ -1012,10 +1011,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {

// Record the write activity, passing a 0 nodeID because replica.writeStats
// intentionally doesn't track the origin of the writes.
b.r.writeStats.RecordCount(float64(b.mutations), 0 /* nodeID */)
if b.r.loadStats != nil {
b.r.loadStats.writeKeys.RecordCount(float64(b.mutations), 0)
}
b.r.loadStats.writeKeys.RecordCount(float64(b.mutations), 0)

now := timeutil.Now()
if needsSplitBySize && r.splitQueueThrottle.ShouldProcess(now) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -112,12 +111,8 @@ func newUnloadedReplica(
r.leaseHistory = newLeaseHistory()
}
if store.cfg.StorePool != nil {
r.leaseholderStats = replicastats.NewReplicaStats(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
r.loadStats = newReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
r.loadStats = NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
}
// Pass nil for the localityOracle because we intentionally don't track the
// origin locality of write load.
r.writeStats = replicastats.NewReplicaStats(store.Clock(), nil)

// Init rangeStr with the range ID.
r.rangeStr.store(replicaID, &roachpb.RangeDescriptor{RangeID: desc.RangeID})
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type ReplicaLoad struct {
readBytes *replicastats.ReplicaStats
}

func newReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad {
// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the
// request throughput of a replica.
func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad {
return &ReplicaLoad{
batchRequests: replicastats.NewReplicaStats(clock, getNodeLocality),
requests: replicastats.NewReplicaStats(clock, getNodeLocality),
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func calcBehindCount(
// practically none). See Replica.getBatchRequestQPS() for how this is
// accounted for.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.AverageRatePerSecond()
return r.loadStats.batchRequests.AverageRatePerSecond()
}

// RequestsPerSecond returns the range's average requests received per second.
Expand All @@ -299,7 +299,7 @@ func (r *Replica) RequestsPerSecond() float64 {
// writes (12 for the metadata, 12 for the versions). A DeleteRange that
// ultimately only removes one key counts as one (or two if it's transactional).
func (r *Replica) WritesPerSecond() float64 {
wps, _ := r.writeStats.AverageRatePerSecond()
wps, _ := r.loadStats.writeKeys.AverageRatePerSecond()
return wps
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ func (r *Replica) leasePostApplyLocked(

// Reset the request counts used to make lease placement decisions and
// load-based splitting/merging decisions whenever starting a new lease.
if r.leaseholderStats != nil {
r.leaseholderStats.ResetRequestCounts()
}
if r.loadStats != nil {
r.loadStats.reset()
}
Expand Down Expand Up @@ -384,9 +381,6 @@ func (r *Replica) leasePostApplyLocked(
} else if prevOwner {
r.store.maybeGossipOnCapacityChange(ctx, leaseRemoveEvent)
}
if r.leaseholderStats != nil {
r.leaseholderStats.ResetRequestCounts()
}
if r.loadStats != nil {
r.loadStats.reset()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ func TestAddSSTQPSStat(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))

// Reset the request counts to 0 before sending to clear previous requests.
repl.leaseholderStats.ResetRequestCounts()
repl.loadStats.reset()

_, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
require.Nil(t, pErr)

repl.leaseholderStats.Mu.Lock()
queriesAfter, _ := repl.leaseholderStats.SumLocked()
repl.leaseholderStats.Mu.Unlock()
repl.loadStats.batchRequests.Mu.Lock()
queriesAfter, _ := repl.loadStats.batchRequests.SumLocked()
repl.loadStats.batchRequests.Mu.Unlock()

// If queries are correctly recorded, we should see increase in query
// count by the expected QPS. However, it is possible to to get a
Expand Down
40 changes: 32 additions & 8 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,13 @@ func (r *Replica) sendWithoutRangeID(
) (_ *roachpb.BatchResponse, _ *StoreWriteBytes, rErr *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.RecordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}

if r.loadStats != nil {
r.loadStats.requests.RecordCount(float64(len(ba.Requests)), 0)
r.loadStats.writeBytes.RecordCount(getBatchRequestWriteBytes(ba), 0)
}
// Add the range log tag.
ctx = r.AnnotateCtx(ctx)

// Record summary throughput information about the batch request for
// accounting.
r.recordBatchRequestLoad(ctx, ba)

// If the internal Raft group is not initialized, create it and wake the leader.
r.maybeInitializeRaftGroup(ctx)

Expand Down Expand Up @@ -1003,6 +999,34 @@ func (r *Replica) executeAdminBatch(
return br, nil
}

// recordBatchRequestLoad records the load information about a batch request issued
// against this replica.
func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) {
if r.loadStats == nil {
log.Eventf(
ctx,
"Unable to record load of batch request for r%d, load stats is not initialized",
ba.Header.RangeID,
)
return
}

// scaledQPS is the adjusted number of queries per second, that is a cost
// estimate of a BatchRequest. See getBatchRequestQPS() for the
// calculation.
scaledQPS := r.getBatchRequestQPS(ctx, ba)

// NB: Previously, we would ignore batch requests which had no gateway node
// id. This was in order to retrieve accurate per-locality counts of
// requests, for follow the workload lease transfers. We now record these
// regardless of the gateway node id, as the only user of this information
// does not consider untagged localities. See
// allocator.shouldTransferLeaseForAccessLocality() .
r.loadStats.batchRequests.RecordCount(scaledQPS, ba.Header.GatewayNodeID)
r.loadStats.requests.RecordCount(float64(len(ba.Requests)), ba.Header.GatewayNodeID)
r.loadStats.writeBytes.RecordCount(getBatchRequestWriteBytes(ba), ba.Header.GatewayNodeID)
}

// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
// estimate returns Queries Per Second (QPS), representing the abstract
// resource cost associated with this request. BatchRequests are calculated as
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (rq *replicateQueue) shouldQueue(
status := repl.LeaseStatusAt(ctx, now)
if status.IsValid() &&
rq.canTransferLeaseFrom(ctx, repl) &&
rq.allocator.ShouldTransferLease(ctx, conf, voterReplicas, repl, repl.leaseholderStats) {
rq.allocator.ShouldTransferLease(ctx, conf, voterReplicas, repl, repl.loadStats.batchRequests) {

log.VEventf(ctx, 2, "lease transfer needed, enqueuing")
return true, 0
Expand Down Expand Up @@ -1550,7 +1550,7 @@ func (rq *replicateQueue) shedLease(
conf,
desc.Replicas().VoterDescriptors(),
repl,
repl.leaseholderStats,
repl.loadStats.batchRequests,
false, /* forceDecisionWithoutStats */
opts,
)
Expand All @@ -1563,7 +1563,7 @@ func (rq *replicateQueue) shedLease(
return allocator.NoTransferDryRun, nil
}

avgQPS, qpsMeasurementDur := repl.leaseholderStats.AverageRatePerSecond()
avgQPS, qpsMeasurementDur := repl.loadStats.batchRequests.AverageRatePerSecond()
if qpsMeasurementDur < replicastats.MinStatsDuration {
avgQPS = 0
}
Expand Down Expand Up @@ -1724,10 +1724,10 @@ func rangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo {
info := allocator.RangeUsageInfo{
LogicalBytes: repl.GetMVCCStats().Total(),
}
if queriesPerSecond, dur := repl.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if queriesPerSecond, dur := repl.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
info.QueriesPerSecond = queriesPerSecond
}
if writesPerSecond, dur := repl.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if writesPerSecond, dur := repl.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
info.WritesPerSecond = writesPerSecond
}
return info
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2994,12 +2994,12 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
// starts? We can't easily have a countdown as its value changes like for
// leases/replicas.
var qps float64
if avgQPS, dur := r.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if avgQPS, dur := r.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
qps = avgQPS
totalQueriesPerSecond += avgQPS
// TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles?
}
if wps, dur := r.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if wps, dur := r.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
totalWritesPerSecond += wps
writesPerReplica = append(writesPerReplica, wps)
}
Expand Down Expand Up @@ -3196,13 +3196,13 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
}
pausedFollowerCount += metrics.PausedFollowerCount
behindCount += metrics.BehindCount
if qps, dur := rep.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if qps, dur := rep.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageQueriesPerSecond += qps
}
if rqps, dur := rep.loadStats.requests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageRequestsPerSecond += rqps
}
if wps, dur := rep.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if wps, dur := rep.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageWritesPerSecond += wps
}
if rps, dur := rep.loadStats.readKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,6 @@ func (s *Store) MergeRange(
return err
}

if leftRepl.leaseholderStats != nil {
leftRepl.leaseholderStats.ResetRequestCounts()
}
if leftRepl.writeStats != nil {
// Note: this could be drastically improved by adding a replicaStats method
// that merges stats. Resetting stats is typically bad for the rebalancing
// logic that depends on them.
leftRepl.writeStats.ResetRequestCounts()
}

leftRepl.loadStats.merge(rightRepl.loadStats)

// Clear the concurrency manager's lock and txn wait-queues to redirect the
Expand Down
13 changes: 6 additions & 7 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,16 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
ValBytes: 4,
}
replica.mu.Unlock()
rs := replicastats.NewReplicaStats(clock, nil)
replica.loadStats = NewReplicaLoad(clock, nil)
for _, store := range stores {
rs.RecordCount(1, store.Node.NodeID)
replica.loadStats.batchRequests.RecordCount(1, store.Node.NodeID)
replica.loadStats.writeKeys.RecordCount(1, store.Node.NodeID)
}
manual.Advance(replicastats.MinStatsDuration + time.Second)
replica.leaseholderStats = rs
replica.writeStats = rs

rangeUsageInfo := rangeUsageInfoForRepl(&replica)
QPS, _ := replica.leaseholderStats.AverageRatePerSecond()
WPS, _ := replica.writeStats.AverageRatePerSecond()
QPS, _ := replica.loadStats.batchRequests.AverageRatePerSecond()
WPS, _ := replica.loadStats.writeKeys.AverageRatePerSecond()

sp.UpdateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeUsageInfo, roachpb.ADD_VOTER)
desc, ok := sp.GetStoreDescriptor(roachpb.StoreID(1))
Expand Down Expand Up @@ -198,7 +197,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
if err != nil {
t.Fatalf("make replica error : %+v", err)
}
replica.leaseholderStats = replicastats.NewReplicaStats(store.Clock(), nil)
replica.loadStats = NewReplicaLoad(store.Clock(), nil)

rangeUsageInfo := rangeUsageInfoForRepl(replica)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
conf,
candidates,
replWithStats.repl,
replWithStats.repl.leaseholderStats,
replWithStats.repl.loadStats.batchRequests,
true, /* forceDecisionWithoutStats */
allocator.TransferLeaseOptions{
Goal: allocator.QPSConvergence,
Expand All @@ -473,7 +473,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
*localDesc,
candidate.StoreID,
candidates,
replWithStats.repl.leaseholderStats,
replWithStats.repl.loadStats.batchRequests,
) {
log.VEventf(
ctx, 3, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead",
Expand Down
Loading

0 comments on commit 04b43de

Please sign in to comment.