diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index af325df1f071..1701b3bdb7c6 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -33,6 +33,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
+kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index e7e06b56857a..73048609d0f2 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -42,6 +42,7 @@
kv.range_split.load_qps_threshold | integer | 2500 | the QPS over which, the range becomes a candidate for load based splitting |
kv.rangefeed.enabled | boolean | false | if set, rangefeed registration is enabled |
kv.replica_circuit_breaker.slow_replication_threshold | duration | 0s | duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) |
+kv.replica_stats.addsst_request_size_factor | integer | 50000 | the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 |
kv.replication_reports.interval | duration | 1m0s | the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) |
kv.snapshot_rebalance.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for rebalance and upreplication snapshots |
kv.snapshot_recovery.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for recovery snapshots |
diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel
index 67cb86bbbdaf..0aca5a8ace03 100644
--- a/pkg/kv/kvserver/BUILD.bazel
+++ b/pkg/kv/kvserver/BUILD.bazel
@@ -377,6 +377,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
+ "//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go
index 553aeffc5041..941cf9168528 100644
--- a/pkg/kv/kvserver/allocator_test.go
+++ b/pkg/kv/kvserver/allocator_test.go
@@ -5110,17 +5110,17 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
// the unknown node 99 in evenlyBalanced to verify that requests from
// unknown localities don't affect the algorithm.
evenlyBalanced := newReplicaStats(clock, localityFn)
- evenlyBalanced.record(1)
- evenlyBalanced.record(2)
- evenlyBalanced.record(3)
+ evenlyBalanced.recordCount(1, 1)
+ evenlyBalanced.recordCount(1, 2)
+ evenlyBalanced.recordCount(1, 3)
imbalanced1 := newReplicaStats(clock, localityFn)
imbalanced2 := newReplicaStats(clock, localityFn)
imbalanced3 := newReplicaStats(clock, localityFn)
for i := 0; i < 100*int(MinLeaseTransferStatsDuration.Seconds()); i++ {
- evenlyBalanced.record(99)
- imbalanced1.record(1)
- imbalanced2.record(2)
- imbalanced3.record(3)
+ evenlyBalanced.recordCount(1, 99)
+ imbalanced1.recordCount(1, 1)
+ imbalanced2.recordCount(1, 2)
+ imbalanced3.recordCount(1, 3)
}
manual.Increment(int64(MinLeaseTransferStatsDuration))
diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
index 33b9b316528c..cc276544dc3d 100644
--- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
+++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
@@ -623,16 +623,16 @@ func TestEvalAddSSTable(t *testing.T) {
expectStatsEst: true,
},
/* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will
- * shortly be removed anyway.
- "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
- atReqTS: 8,
- data: []sstutil.KV{{"a", 6, "a6"}},
- sst: []sstutil.KV{{"a", 7, "a7"}},
- sstTimestamp: 8,
- expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
- expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
- expectStatsEst: true,
- },*/
+ * shortly be removed anyway.
+ "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
+ atReqTS: 8,
+ data: []sstutil.KV{{"a", 6, "a6"}},
+ sst: []sstutil.KV{{"a", 7, "a7"}},
+ sstTimestamp: 8,
+ expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
+ expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
+ expectStatsEst: true,
+ },*/
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
for name, tc := range testcases {
diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go
index fd7db9b5095a..7844ae8f79f9 100644
--- a/pkg/kv/kvserver/replica_metrics.go
+++ b/pkg/kv/kvserver/replica_metrics.go
@@ -249,8 +249,8 @@ func calcBehindCount(
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding requests
// that weren't sent through a DistSender, which in practice should be
-// practically none). Also return the amount of time over which the stat was
-// accumulated.
+// practically none). See Replica.getBatchRequestQPS() for how this is
+// accounted for.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.avgQPS()
}
diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go
index 548e2e91f1ed..271b88c5eee8 100644
--- a/pkg/kv/kvserver/replica_rankings_test.go
+++ b/pkg/kv/kvserver/replica_rankings_test.go
@@ -11,13 +11,20 @@
package kvserver
import (
+ "context"
+ "fmt"
"math/rand"
"reflect"
"testing"
+ "github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sstutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/stretchr/testify/require"
)
func TestReplicaRankings(t *testing.T) {
@@ -73,3 +80,107 @@ func TestReplicaRankings(t *testing.T) {
}
}
}
+
+// TestAddSSTQPSStat verifies that AddSSTableRequests are accounted for
+// differently, when present in a BatchRequest, with a divisor set.
+func TestAddSSTQPSStat(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
+ tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
+ })
+
+ defer tc.Stopper().Stop(ctx)
+ ts := tc.Server(0)
+ db := ts.DB()
+ conn := tc.ServerConn(0)
+ sqlDB := sqlutils.MakeSQLRunner(conn)
+
+ scratchKey := tc.ScratchRange(t)
+ nextKey := scratchKey.Next()
+
+ // Construct an sst with 200 keys that will be reused with different divisors.
+ sstKeys := make([]sstutil.KV, 200)
+ for i := range sstKeys {
+ sstKeys[i] = sstutil.KV{
+ KeyString: nextKey.String(),
+ WallTimestamp: 1,
+ ValueString: "value",
+ }
+ nextKey = nextKey.Next()
+ }
+ sst, start, end := sstutil.MakeSST(t, sstKeys)
+ requestSize := float64(len(sst))
+
+ sstReq := &roachpb.AddSSTableRequest{
+ RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end},
+ Data: sst,
+ MVCCStats: sstutil.ComputeStats(t, sst),
+ }
+
+ get := &roachpb.GetRequest{
+ RequestHeader: roachpb.RequestHeader{Key: start},
+ }
+
+ addSSTBA := roachpb.BatchRequest{}
+ nonSSTBA := roachpb.BatchRequest{}
+ addSSTBA.Add(sstReq)
+ nonSSTBA.Add(get)
+
+ // When the factor is set to 0, it is disabled and we expect uniform 1 QPS.
+ // In all other cases, we expect 1 + the size of a
+ // AddSSTableRequest/factor. If no AddSStableRequest exists within the
+ // request, it should be cost 1, regardless of factor.
+ testCases := []struct {
+ addsstRequestFactor int
+ expectedQPS float64
+ ba roachpb.BatchRequest
+ }{
+ {0, 1, addSSTBA},
+ {100, 1, nonSSTBA},
+ {10, 1 + requestSize/10, addSSTBA},
+ {20, 1 + requestSize/20, addSSTBA},
+ {40, 1 + requestSize/40, addSSTBA},
+ {100, 1 + requestSize/100, addSSTBA},
+ }
+
+ // Send an AddSSTRequest once to create the key range.
+ _, pErr := db.NonTransactionalSender().Send(ctx, addSSTBA)
+ require.Nil(t, pErr)
+
+ store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
+ require.NoError(t, err)
+
+ repl := store.LookupReplica(roachpb.RKey(start))
+ require.NotNil(t, repl)
+
+ // Disable the consistency checker, to avoid interleaving requests
+ // artificially inflating QPS due to consistency checking.
+ sqlDB.Exec(t, `SET CLUSTER SETTING server.consistency_check.interval = '0'`)
+
+ for _, testCase := range testCases {
+ sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))
+
+ // Reset the request counts to 0 before sending to clear previous requests.
+ repl.leaseholderStats.resetRequestCounts()
+
+ _, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
+ require.Nil(t, pErr)
+
+ repl.leaseholderStats.mu.Lock()
+ queriesAfter, _ := repl.leaseholderStats.sumQueriesLocked()
+ repl.leaseholderStats.mu.Unlock()
+
+ // If queries are correctly recorded, we should see increase in query
+ // count by the expected QPS. However, it is possible to to get a
+ // slightly higher number due to interleaving requests. To avoid a
+ // flakey test, we assert that QPS is at least as high as expected,
+ // then no greater than 4 requests of expected QPS. If this test is
+ // flaky, increase the delta to account for background activity
+ // interleaving with measurements.
+ require.GreaterOrEqual(t, queriesAfter, testCase.expectedQPS)
+ require.InDelta(t, queriesAfter, testCase.expectedQPS, 4)
+ }
+}
diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go
index 48522d0c5a2f..b094e78db398 100644
--- a/pkg/kv/kvserver/replica_send.go
+++ b/pkg/kv/kvserver/replica_send.go
@@ -122,8 +122,9 @@ func (r *Replica) sendWithoutRangeID(
ctx context.Context, ba *roachpb.BatchRequest,
) (_ *roachpb.BatchResponse, rErr *roachpb.Error) {
var br *roachpb.BatchResponse
+
if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
- r.leaseholderStats.record(ba.Header.GatewayNodeID)
+ r.leaseholderStats.recordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}
// Add the range log tag.
@@ -966,6 +967,36 @@ func (r *Replica) executeAdminBatch(
return br, nil
}
+// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
+// estimate returns Queries Per Second (QPS), representing the abstract
+// resource cost associated with this request. BatchRequests are calculated as
+// 1 QPS, unless an AddSSTableRequest exists, in which case the sum of all
+// AddSSTableRequest's data size is divided by a factor and added to QPS. This
+// specific treatment of QPS is a special case to account for the mismatch
+// between AddSSTableRequest and other requests in terms of resource use.
+func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchRequest) float64 {
+ var count float64 = 1
+
+ // For divisors less than 1, use the default treatment of QPS.
+ requestFact := AddSSTableRequestSizeFactor.Get(&r.store.cfg.Settings.SV)
+ if requestFact < 1 {
+ return count
+ }
+
+ var addSSTSize float64 = 0
+ for _, req := range ba.Requests {
+ switch t := req.GetInner().(type) {
+ case *roachpb.AddSSTableRequest:
+ addSSTSize += float64(len(t.Data))
+ default:
+ continue
+ }
+ }
+
+ count += addSSTSize / float64(requestFact)
+ return count
+}
+
// checkBatchRequest verifies BatchRequest validity requirements. In particular,
// the batch must have an assigned timestamp, and either all requests must be
// read-only, or none.
diff --git a/pkg/kv/kvserver/replica_stats.go b/pkg/kv/kvserver/replica_stats.go
index 51d09555eed0..a60c70a28a1f 100644
--- a/pkg/kv/kvserver/replica_stats.go
+++ b/pkg/kv/kvserver/replica_stats.go
@@ -15,6 +15,7 @@ import (
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -31,6 +32,24 @@ const (
MinStatsDuration = 5 * time.Second
)
+// AddSSTableRequestSizeFactor wraps
+// "kv.replica_stats.addsst_request_size_factor". When this setting is set to
+// 0, all batch requests are treated uniformly as 1 QPS. When this setting is
+// greater than or equal to 1, AddSSTable requests will add additional QPS,
+// when present within a batch request. The additional QPS is size of the
+// SSTable data, divided by this factor. Thereby, the magnitude of this factor
+// is inversely related to QPS sensitivity to AddSSTableRequests.
+var AddSSTableRequestSizeFactor = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "kv.replica_stats.addsst_request_size_factor",
+ "the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1",
+ // The default value of 50,000 was chosen as the default divisor, following manual testing that
+ // is discussed in this pull request: #76252. Every additional 50,000 AddSSTable bytes will
+ // increase accounted QPS by 1. Typically AddSSTableRequests are ~1mb in size, accounted as 20
+ // QPS.
+ 50000,
+).WithPublic()
+
type localityOracle func(roachpb.NodeID) string
// perLocalityCounts maps from the string representation of a locality to count.
@@ -105,10 +124,6 @@ func (rs *replicaStats) splitRequestCounts(other *replicaStats) {
}
}
-func (rs *replicaStats) record(nodeID roachpb.NodeID) {
- rs.recordCount(1, nodeID)
-}
-
func (rs *replicaStats) recordCount(count float64, nodeID roachpb.NodeID) {
var locality string
if rs.getNodeLocality != nil {
@@ -179,6 +194,25 @@ func (rs *replicaStats) perLocalityDecayingQPS() (perLocalityCounts, time.Durati
return counts, now.Sub(rs.mu.lastReset)
}
+// sumQueriesLocked returns the sum of all queries currently recorded.
+// Calling this method requires holding a lock on mu.
+func (rs *replicaStats) sumQueriesLocked() (float64, int) {
+ var sum float64
+ var windowsUsed int
+ for i := range rs.mu.requests {
+ // We have to add len(rs.mu.requests) to the numerator to avoid getting a
+ // negative result from the modulus operation when rs.mu.idx is small.
+ requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests)
+ if cur := rs.mu.requests[requestsIdx]; cur != nil {
+ windowsUsed++
+ for _, v := range cur {
+ sum += v
+ }
+ }
+ }
+ return sum, windowsUsed
+}
+
// avgQPS returns the average requests-per-second and the amount of time
// over which the stat was accumulated. Note that these averages are exact,
// not exponentially decayed (there isn't a ton of justification for going
@@ -196,19 +230,7 @@ func (rs *replicaStats) avgQPS() (float64, time.Duration) {
rs.maybeRotateLocked(now)
// First accumulate the counts, then divide by the total number of seconds.
- var sum float64
- var windowsUsed int
- for i := range rs.mu.requests {
- // We have to add len(rs.mu.requests) to the numerator to avoid getting a
- // negative result from the modulus operation when rs.mu.idx is small.
- requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests)
- if cur := rs.mu.requests[requestsIdx]; cur != nil {
- windowsUsed++
- for _, v := range cur {
- sum += v
- }
- }
- }
+ sum, windowsUsed := rs.sumQueriesLocked()
if windowsUsed <= 0 {
return 0, 0
}
diff --git a/pkg/kv/kvserver/replica_stats_test.go b/pkg/kv/kvserver/replica_stats_test.go
index 364680d23a9a..d529a86d7784 100644
--- a/pkg/kv/kvserver/replica_stats_test.go
+++ b/pkg/kv/kvserver/replica_stats_test.go
@@ -174,7 +174,7 @@ func TestReplicaStats(t *testing.T) {
return tc.localities[nodeID]
})
for _, req := range tc.reqs {
- rs.record(req)
+ rs.recordCount(1, req)
}
manual.Increment(int64(time.Second))
if actual, _ := rs.perLocalityDecayingQPS(); !floatMapsEqual(tc.expected, actual) {
@@ -200,7 +200,7 @@ func TestReplicaStats(t *testing.T) {
t.Errorf("%d: avgQPS() got %f, want %f", i, actual, expectedAvgQPS)
}
rs.resetRequestCounts()
- if actual, _ := rs.perLocalityDecayingQPS(); len(actual) != 0 {
+ if actual, _ := rs.sumQueriesLocked(); actual != 0 {
t.Errorf("%d: unexpected non-empty QPS averages after resetting: %+v", i, actual)
}
}
@@ -240,7 +240,7 @@ func TestReplicaStatsDecay(t *testing.T) {
{
for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} {
- rs.record(req)
+ rs.recordCount(1, req)
}
counts := perLocalityCounts{
awsLocalities[1]: 2,
@@ -285,11 +285,11 @@ func TestReplicaStatsDecay(t *testing.T) {
{
for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} {
- rs.record(req)
+ rs.recordCount(1, req)
}
manual.Increment(int64(replStatsRotateInterval))
for _, req := range []roachpb.NodeID{2, 2, 3, 3, 3} {
- rs.record(req)
+ rs.recordCount(1, req)
}
durationDivisor := time.Duration(float64(replStatsRotateInterval) * decayFactor).Seconds()
expected := perLocalityCounts{
@@ -322,11 +322,11 @@ func TestReplicaStatsDecaySmoothing(t *testing.T) {
rs := newReplicaStats(clock, func(nodeID roachpb.NodeID) string {
return awsLocalities[nodeID]
})
- rs.record(1)
- rs.record(1)
- rs.record(2)
- rs.record(2)
- rs.record(3)
+ rs.recordCount(1, 1)
+ rs.recordCount(1, 1)
+ rs.recordCount(1, 2)
+ rs.recordCount(1, 2)
+ rs.recordCount(1, 3)
expected := perLocalityCounts{
awsLocalities[1]: 2,
awsLocalities[2]: 2,
diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go
index 78354ab253d4..70ebdd307c9d 100644
--- a/pkg/kv/kvserver/store_pool_test.go
+++ b/pkg/kv/kvserver/store_pool_test.go
@@ -547,7 +547,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
replica.mu.Unlock()
rs := newReplicaStats(clock, nil)
for _, store := range stores {
- rs.record(store.Node.NodeID)
+ rs.recordCount(1, store.Node.NodeID)
}
manual.Increment(int64(MinStatsDuration + time.Second))
replica.leaseholderStats = rs