Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: record batch requests with no gateway #85178

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
Expand Down Expand Up @@ -162,8 +161,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
repl.mu.state.Stats = &enginepb.MVCCStats{}
repl.mu.Unlock()

repl.leaseholderStats = replicastats.NewReplicaStats(clock, nil)
repl.writeStats = replicastats.NewReplicaStats(clock, nil)
repl.loadStats = NewReplicaLoad(clock, nil)

var rangeUsageInfo allocator.RangeUsageInfo

Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2774,9 +2774,14 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
if e, a := int32(1), cap.LeaseCount; e != a {
t.Errorf("expected cap.LeaseCount=%d, got %d", e, a)
}
if minExpected, a := 1/float64(replicastats.MinStatsDuration/time.Second), cap.WritesPerSecond; minExpected > a {

// NB: The writes per second may be within some error bound below the
// minExpected due to timing and floating point calculation. An error of
// 0.01 (WPS) is added to avoid flaking the test.
if minExpected, a := 1/float64(replicastats.MinStatsDuration/time.Second), cap.WritesPerSecond; minExpected > a+0.01 {
t.Errorf("expected cap.WritesPerSecond >= %f, got %f", minExpected, a)
}

bpr2 := cap.BytesPerReplica
if bpr2.P10 <= bpr1.P10 {
t.Errorf("expected BytesPerReplica to have increased from %+v, but got %+v", bpr1, bpr2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/deprecated_store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (sr *StoreRebalancer) deprecatedChooseLeaseToTransfer(
*localDesc,
candidate.StoreID,
candidates,
replWithStats.repl.leaseholderStats,
replWithStats.repl.loadStats.batchRequests,
) {
log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping",
desc.RangeID, localDesc.StoreID)
Expand Down
21 changes: 3 additions & 18 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
Expand Down Expand Up @@ -228,24 +227,10 @@ type Replica struct {
store *Store
abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort

// leaseholderStats tracks all incoming BatchRequests to the replica and which
// localities they come from in order to aid in lease rebalancing decisions.
leaseholderStats *replicastats.ReplicaStats
// writeStats tracks the number of mutations (as counted by the pebble batch
// to be applied to the state machine), and additionally, the number of keys
// added to MVCCStats, which notably may be approximate in the case of an
// AddSSTable. In other words, writeStats should loosely track the write
// activity on the replica on a per-key basis, though in an inconsistent way
// that in particular may overcount by a factor of roughly two.
//
// Note that while writeStats were originally introduced to aid in rebalancing
// decisions in [1], at the time of writing they are not used for that
// purpose.
//
// [1]: https://github.com/cockroachdb/cockroach/pull/16664
writeStats *replicastats.ReplicaStats

// loadStats tracks a sliding window of throughput on this replica.
// Multiple types of throughput are accounted for. Where the localities of
// requests are tracked in order in addition to the aggregate, in order to
// inform load based lease and replica rebalancing decisions.
loadStats *ReplicaLoad

// creatingReplica is set when a replica is created as uninitialized
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
b.r.store.metrics.AddSSTableApplicationCopies.Inc(1)
}
if added := res.Delta.KeyCount; added > 0 {
b.r.writeStats.RecordCount(float64(added), 0)
b.r.loadStats.writeKeys.RecordCount(float64(added), 0)
}
if res.AddSSTable.AtWriteTimestamp {
Expand Down Expand Up @@ -1012,10 +1011,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {

// Record the write activity, passing a 0 nodeID because replica.writeStats
// intentionally doesn't track the origin of the writes.
b.r.writeStats.RecordCount(float64(b.mutations), 0 /* nodeID */)
if b.r.loadStats != nil {
b.r.loadStats.writeKeys.RecordCount(float64(b.mutations), 0)
}
b.r.loadStats.writeKeys.RecordCount(float64(b.mutations), 0)

now := timeutil.Now()
if needsSplitBySize && r.splitQueueThrottle.ShouldProcess(now) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -112,12 +111,8 @@ func newUnloadedReplica(
r.leaseHistory = newLeaseHistory()
}
if store.cfg.StorePool != nil {
r.leaseholderStats = replicastats.NewReplicaStats(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
r.loadStats = newReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
r.loadStats = NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
}
// Pass nil for the localityOracle because we intentionally don't track the
// origin locality of write load.
r.writeStats = replicastats.NewReplicaStats(store.Clock(), nil)

// Init rangeStr with the range ID.
r.rangeStr.store(replicaID, &roachpb.RangeDescriptor{RangeID: desc.RangeID})
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type ReplicaLoad struct {
readBytes *replicastats.ReplicaStats
}

func newReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad {
// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the
// request throughput of a replica.
func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad {
return &ReplicaLoad{
batchRequests: replicastats.NewReplicaStats(clock, getNodeLocality),
requests: replicastats.NewReplicaStats(clock, getNodeLocality),
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func calcBehindCount(
// practically none). See Replica.getBatchRequestQPS() for how this is
// accounted for.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.AverageRatePerSecond()
return r.loadStats.batchRequests.AverageRatePerSecond()
}

// RequestsPerSecond returns the range's average requests received per second.
Expand All @@ -299,7 +299,7 @@ func (r *Replica) RequestsPerSecond() float64 {
// writes (12 for the metadata, 12 for the versions). A DeleteRange that
// ultimately only removes one key counts as one (or two if it's transactional).
func (r *Replica) WritesPerSecond() float64 {
wps, _ := r.writeStats.AverageRatePerSecond()
wps, _ := r.loadStats.writeKeys.AverageRatePerSecond()
return wps
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ func (r *Replica) leasePostApplyLocked(

// Reset the request counts used to make lease placement decisions and
// load-based splitting/merging decisions whenever starting a new lease.
if r.leaseholderStats != nil {
r.leaseholderStats.ResetRequestCounts()
}
if r.loadStats != nil {
r.loadStats.reset()
}
Expand Down Expand Up @@ -384,9 +381,6 @@ func (r *Replica) leasePostApplyLocked(
} else if prevOwner {
r.store.maybeGossipOnCapacityChange(ctx, leaseRemoveEvent)
}
if r.leaseholderStats != nil {
r.leaseholderStats.ResetRequestCounts()
}
if r.loadStats != nil {
r.loadStats.reset()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ func TestAddSSTQPSStat(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))

// Reset the request counts to 0 before sending to clear previous requests.
repl.leaseholderStats.ResetRequestCounts()
repl.loadStats.reset()

_, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
require.Nil(t, pErr)

repl.leaseholderStats.Mu.Lock()
queriesAfter, _ := repl.leaseholderStats.SumLocked()
repl.leaseholderStats.Mu.Unlock()
repl.loadStats.batchRequests.Mu.Lock()
queriesAfter, _ := repl.loadStats.batchRequests.SumLocked()
repl.loadStats.batchRequests.Mu.Unlock()

// If queries are correctly recorded, we should see increase in query
// count by the expected QPS. However, it is possible to to get a
Expand Down
35 changes: 27 additions & 8 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,13 @@ func (r *Replica) sendWithoutRangeID(
) (_ *roachpb.BatchResponse, _ *StoreWriteBytes, rErr *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.RecordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}

if r.loadStats != nil {
r.loadStats.requests.RecordCount(float64(len(ba.Requests)), 0)
r.loadStats.writeBytes.RecordCount(getBatchRequestWriteBytes(ba), 0)
}
// Add the range log tag.
ctx = r.AnnotateCtx(ctx)

// Record summary throughput information about the batch request for
// accounting.
r.recordBatchRequestLoad(ctx, ba)

// If the internal Raft group is not initialized, create it and wake the leader.
r.maybeInitializeRaftGroup(ctx)

Expand Down Expand Up @@ -1003,6 +999,29 @@ func (r *Replica) executeAdminBatch(
return br, nil
}

// recordBatchRequestLoad records the load information about a batch request issued
// against this replica.
func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) {
if r.loadStats == nil {
log.VEventf(
ctx,
3,
"Unable to record load of batch request for r%d, load stats is not initialized",
ba.Header.RangeID,
)
return
}

// adjustedQPS is the adjusted number of queries per second, that is a cost
// estimate of a BatchRequest. See getBatchRequestQPS() for the
// calculation.
adjustedQPS := r.getBatchRequestQPS(ctx, ba)

r.loadStats.batchRequests.RecordCount(adjustedQPS, ba.Header.GatewayNodeID)
r.loadStats.requests.RecordCount(float64(len(ba.Requests)), ba.Header.GatewayNodeID)
r.loadStats.writeBytes.RecordCount(getBatchRequestWriteBytes(ba), ba.Header.GatewayNodeID)
}

// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
// estimate returns Queries Per Second (QPS), representing the abstract
// resource cost associated with this request. BatchRequests are calculated as
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (rq *replicateQueue) shouldQueue(
status := repl.LeaseStatusAt(ctx, now)
if status.IsValid() &&
rq.canTransferLeaseFrom(ctx, repl) &&
rq.allocator.ShouldTransferLease(ctx, conf, voterReplicas, repl, repl.leaseholderStats) {
rq.allocator.ShouldTransferLease(ctx, conf, voterReplicas, repl, repl.loadStats.batchRequests) {

log.VEventf(ctx, 2, "lease transfer needed, enqueuing")
return true, 0
Expand Down Expand Up @@ -1550,7 +1550,7 @@ func (rq *replicateQueue) shedLease(
conf,
desc.Replicas().VoterDescriptors(),
repl,
repl.leaseholderStats,
repl.loadStats.batchRequests,
false, /* forceDecisionWithoutStats */
opts,
)
Expand All @@ -1563,7 +1563,7 @@ func (rq *replicateQueue) shedLease(
return allocator.NoTransferDryRun, nil
}

avgQPS, qpsMeasurementDur := repl.leaseholderStats.AverageRatePerSecond()
avgQPS, qpsMeasurementDur := repl.loadStats.batchRequests.AverageRatePerSecond()
if qpsMeasurementDur < replicastats.MinStatsDuration {
avgQPS = 0
}
Expand Down Expand Up @@ -1724,10 +1724,10 @@ func rangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo {
info := allocator.RangeUsageInfo{
LogicalBytes: repl.GetMVCCStats().Total(),
}
if queriesPerSecond, dur := repl.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if queriesPerSecond, dur := repl.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
info.QueriesPerSecond = queriesPerSecond
}
if writesPerSecond, dur := repl.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if writesPerSecond, dur := repl.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
info.WritesPerSecond = writesPerSecond
}
return info
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2994,12 +2994,12 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
// starts? We can't easily have a countdown as its value changes like for
// leases/replicas.
var qps float64
if avgQPS, dur := r.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if avgQPS, dur := r.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
qps = avgQPS
totalQueriesPerSecond += avgQPS
// TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles?
}
if wps, dur := r.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if wps, dur := r.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
totalWritesPerSecond += wps
writesPerReplica = append(writesPerReplica, wps)
}
Expand Down Expand Up @@ -3196,13 +3196,13 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
}
pausedFollowerCount += metrics.PausedFollowerCount
behindCount += metrics.BehindCount
if qps, dur := rep.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if qps, dur := rep.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageQueriesPerSecond += qps
}
if rqps, dur := rep.loadStats.requests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageRequestsPerSecond += rqps
}
if wps, dur := rep.writeStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
if wps, dur := rep.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
averageWritesPerSecond += wps
}
if rps, dur := rep.loadStats.readKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,6 @@ func (s *Store) MergeRange(
return err
}

if leftRepl.leaseholderStats != nil {
leftRepl.leaseholderStats.ResetRequestCounts()
}
if leftRepl.writeStats != nil {
// Note: this could be drastically improved by adding a replicaStats method
// that merges stats. Resetting stats is typically bad for the rebalancing
// logic that depends on them.
leftRepl.writeStats.ResetRequestCounts()
}

leftRepl.loadStats.merge(rightRepl.loadStats)

// Clear the concurrency manager's lock and txn wait-queues to redirect the
Expand Down
13 changes: 6 additions & 7 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,16 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
ValBytes: 4,
}
replica.mu.Unlock()
rs := replicastats.NewReplicaStats(clock, nil)
replica.loadStats = NewReplicaLoad(clock, nil)
for _, store := range stores {
rs.RecordCount(1, store.Node.NodeID)
replica.loadStats.batchRequests.RecordCount(1, store.Node.NodeID)
replica.loadStats.writeKeys.RecordCount(1, store.Node.NodeID)
}
manual.Advance(replicastats.MinStatsDuration + time.Second)
replica.leaseholderStats = rs
replica.writeStats = rs

rangeUsageInfo := rangeUsageInfoForRepl(&replica)
QPS, _ := replica.leaseholderStats.AverageRatePerSecond()
WPS, _ := replica.writeStats.AverageRatePerSecond()
QPS, _ := replica.loadStats.batchRequests.AverageRatePerSecond()
WPS, _ := replica.loadStats.writeKeys.AverageRatePerSecond()

sp.UpdateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeUsageInfo, roachpb.ADD_VOTER)
desc, ok := sp.GetStoreDescriptor(roachpb.StoreID(1))
Expand Down Expand Up @@ -198,7 +197,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
if err != nil {
t.Fatalf("make replica error : %+v", err)
}
replica.leaseholderStats = replicastats.NewReplicaStats(store.Clock(), nil)
replica.loadStats = NewReplicaLoad(store.Clock(), nil)

rangeUsageInfo := rangeUsageInfoForRepl(replica)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
conf,
candidates,
replWithStats.repl,
replWithStats.repl.leaseholderStats,
replWithStats.repl.loadStats.batchRequests,
true, /* forceDecisionWithoutStats */
allocator.TransferLeaseOptions{
Goal: allocator.QPSConvergence,
Expand All @@ -472,7 +472,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
*localDesc,
candidate.StoreID,
candidates,
replWithStats.repl.leaseholderStats,
replWithStats.repl.loadStats.batchRequests,
) {
log.VEventf(
ctx, 3, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead",
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/store_rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
Expand Down Expand Up @@ -468,10 +467,9 @@ func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) {
// rangeInfo code is ripped out of the allocator.
repl.mu.state.Stats = &enginepb.MVCCStats{}

repl.leaseholderStats = replicastats.NewReplicaStats(s.Clock(), nil)
repl.leaseholderStats.SetMeanRateForTesting(r.qps)
repl.loadStats = NewReplicaLoad(s.Clock(), nil)
repl.loadStats.batchRequests.SetMeanRateForTesting(r.qps)

repl.writeStats = replicastats.NewReplicaStats(s.Clock(), nil)
acc.addReplica(replicaWithStats{
repl: repl,
qps: r.qps,
Expand Down
Loading