diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 2b6b31b957ba..774556323669 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -33,6 +33,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
+kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 2962ea1cb393..d53f7cc7901e 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -41,6 +41,7 @@
kv.range_split.load_qps_threshold | integer | 2500 | the QPS over which, the range becomes a candidate for load based splitting |
kv.rangefeed.enabled | boolean | false | if set, rangefeed registration is enabled |
kv.replica_circuit_breaker.slow_replication_threshold | duration | 0s | duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) |
+kv.replica_stats.addsst_request_size_factor | integer | 50000 | the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 |
kv.replication_reports.interval | duration | 1m0s | the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) |
kv.snapshot_rebalance.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for rebalance and upreplication snapshots |
kv.snapshot_recovery.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for recovery snapshots |
diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel
index 861b25db3f75..a0a914e1bd9e 100644
--- a/pkg/kv/kvserver/BUILD.bazel
+++ b/pkg/kv/kvserver/BUILD.bazel
@@ -378,6 +378,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
+ "//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go
index 17f5e315a661..03eb15651000 100644
--- a/pkg/kv/kvserver/allocator_test.go
+++ b/pkg/kv/kvserver/allocator_test.go
@@ -5108,17 +5108,17 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
// the unknown node 99 in evenlyBalanced to verify that requests from
// unknown localities don't affect the algorithm.
evenlyBalanced := newReplicaStats(clock, localityFn)
- evenlyBalanced.record(1)
- evenlyBalanced.record(2)
- evenlyBalanced.record(3)
+ evenlyBalanced.recordCount(1, 1)
+ evenlyBalanced.recordCount(1, 2)
+ evenlyBalanced.recordCount(1, 3)
imbalanced1 := newReplicaStats(clock, localityFn)
imbalanced2 := newReplicaStats(clock, localityFn)
imbalanced3 := newReplicaStats(clock, localityFn)
for i := 0; i < 100*int(MinLeaseTransferStatsDuration.Seconds()); i++ {
- evenlyBalanced.record(99)
- imbalanced1.record(1)
- imbalanced2.record(2)
- imbalanced3.record(3)
+ evenlyBalanced.recordCount(1, 99)
+ imbalanced1.recordCount(1, 1)
+ imbalanced2.recordCount(1, 2)
+ imbalanced3.recordCount(1, 3)
}
manual.Increment(int64(MinLeaseTransferStatsDuration))
diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
index 33b9b316528c..cc276544dc3d 100644
--- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
+++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
@@ -623,16 +623,16 @@ func TestEvalAddSSTable(t *testing.T) {
expectStatsEst: true,
},
/* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will
- * shortly be removed anyway.
- "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
- atReqTS: 8,
- data: []sstutil.KV{{"a", 6, "a6"}},
- sst: []sstutil.KV{{"a", 7, "a7"}},
- sstTimestamp: 8,
- expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
- expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
- expectStatsEst: true,
- },*/
+ * shortly be removed anyway.
+ "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
+ atReqTS: 8,
+ data: []sstutil.KV{{"a", 6, "a6"}},
+ sst: []sstutil.KV{{"a", 7, "a7"}},
+ sstTimestamp: 8,
+ expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
+ expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
+ expectStatsEst: true,
+ },*/
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
for name, tc := range testcases {
diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go
index fd7db9b5095a..7844ae8f79f9 100644
--- a/pkg/kv/kvserver/replica_metrics.go
+++ b/pkg/kv/kvserver/replica_metrics.go
@@ -249,8 +249,8 @@ func calcBehindCount(
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding requests
// that weren't sent through a DistSender, which in practice should be
-// practically none). Also return the amount of time over which the stat was
-// accumulated.
+// practically none). See Replica.getBatchRequestQPS() for how this is
+// accounted for.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.avgQPS()
}
diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go
index 548e2e91f1ed..1d96d7053cfb 100644
--- a/pkg/kv/kvserver/replica_rankings_test.go
+++ b/pkg/kv/kvserver/replica_rankings_test.go
@@ -11,13 +11,20 @@
package kvserver
import (
+ "context"
+ "fmt"
"math/rand"
"reflect"
"testing"
+ "github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sstutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/stretchr/testify/require"
)
func TestReplicaRankings(t *testing.T) {
@@ -73,3 +80,91 @@ func TestReplicaRankings(t *testing.T) {
}
}
}
+
+// TestAddSSTQPSStat verifies that AddSSTableRequests are accounted for
+// differently, when present in a BatchRequest, with a divisor set.
+func TestAddSSTQPSStat(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
+ tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
+ })
+
+ defer tc.Stopper().Stop(ctx)
+ ts := tc.Server(0)
+ db := ts.DB()
+ conn := tc.ServerConn(0)
+ sqlDB := sqlutils.MakeSQLRunner(conn)
+
+ // Construct an sst with 100 keys that will be reused with different divisors.
+ sstKeys := make([]sstutil.KV, 100)
+ for i := range sstKeys {
+ sstKeys[i] = sstutil.KV{fmt.Sprintf("%3d", i+1), 1, "value"}
+ }
+ sst, start, end := sstutil.MakeSST(t, sstKeys)
+ requestSize := float64(len(sst))
+
+ sstReq := &roachpb.AddSSTableRequest{
+ RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end},
+ Data: sst,
+ MVCCStats: sstutil.ComputeStats(t, sst),
+ }
+
+ get := &roachpb.GetRequest{
+ RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("001")},
+ }
+
+ addSSTBA := roachpb.BatchRequest{}
+ nonSSTBA := roachpb.BatchRequest{}
+ addSSTBA.Add(sstReq)
+ nonSSTBA.Add(get)
+
+ // When the factor is set to 0, it is disabled and we expect uniform 1 QPS.
+ // In all other cases, we expect 1 + the size of a
+ // AddSSTableRequest/factor. If no AddSStableRequest exists within the
+ // request, it should be cost 1, regardless of factor.
+ testCases := []struct {
+ addsstRequestFactor int
+ expectedQPS float64
+ ba roachpb.BatchRequest
+ }{
+ {0, 1, addSSTBA},
+ {100, 1, nonSSTBA},
+ {1, 1 + requestSize, addSSTBA},
+ {10, 1 + requestSize/10, addSSTBA},
+ {50, 1 + requestSize/50, addSSTBA},
+ {100, 1 + requestSize/100, addSSTBA},
+ }
+
+ // Send an AddSSTRequest once to create the key range.
+ _, pErr := db.NonTransactionalSender().Send(ctx, addSSTBA)
+ require.Nil(t, pErr)
+
+ store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
+ require.NoError(t, err)
+
+ repl := store.LookupReplica(roachpb.RKey(start))
+ require.NotNil(t, repl)
+
+ for _, testCase := range testCases {
+ sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))
+
+ qpsBefore, durationBefore := repl.QueriesPerSecond()
+ queriesBefore := qpsBefore * durationBefore.Seconds()
+
+ _, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
+ require.Nil(t, pErr)
+
+ qpsAfter, durationAfter := repl.QueriesPerSecond()
+ queriesAfter := qpsAfter * durationAfter.Seconds()
+ queriesIncrease := queriesAfter - queriesBefore
+
+ // If queries are correctly recorded, we should see increase in query count by the expected
+ // QPS. However, it is possible to to get a slightly higher or lower number, due to rounding
+ // or timing errors; the result is calculated using floating point division. We allow a
+ // tolerance of 4 to avoid flaking the test due to this inaccuracy.
+ require.InDelta(t, queriesIncrease, testCase.expectedQPS, 4)
+ }
+}
diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go
index 48522d0c5a2f..81ceeb117b0a 100644
--- a/pkg/kv/kvserver/replica_send.go
+++ b/pkg/kv/kvserver/replica_send.go
@@ -122,8 +122,9 @@ func (r *Replica) sendWithoutRangeID(
ctx context.Context, ba *roachpb.BatchRequest,
) (_ *roachpb.BatchResponse, rErr *roachpb.Error) {
var br *roachpb.BatchResponse
+
if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
- r.leaseholderStats.record(ba.Header.GatewayNodeID)
+ r.leaseholderStats.recordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}
// Add the range log tag.
@@ -966,6 +967,36 @@ func (r *Replica) executeAdminBatch(
return br, nil
}
+// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
+// estimate returns Queries Per Second (QPS), representing the abstract
+// resource cost associated with this request. BatchRequests are calculated as
+// 1 QPS, unless an AddSSTableRequest exists, in which case the sum of all
+// AddSSTableRequest's data size is divided by a factor and added to QPS. This
+// specific treatment of QPS is a special case to account for the mismatch
+// between AddSSTableRequest and other requests in terms of resource use.
+func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchRequest) float64 {
+ var count float64 = 1
+
+ // For divisors less than 1, use the default treatment of QPS.
+ requestFact := r.AddSSTableRequestSizeFactor()
+ if requestFact < 1 {
+ return count
+ }
+
+ var addSSTSize int64 = 0
+ for _, req := range ba.Requests {
+ switch t := req.GetInner().(type) {
+ case *roachpb.AddSSTableRequest:
+ addSSTSize += int64(len(t.Data))
+ default:
+ continue
+ }
+ }
+
+ count += float64(addSSTSize / requestFact)
+ return count
+}
+
// checkBatchRequest verifies BatchRequest validity requirements. In particular,
// the batch must have an assigned timestamp, and either all requests must be
// read-only, or none.
diff --git a/pkg/kv/kvserver/replica_stats.go b/pkg/kv/kvserver/replica_stats.go
index 51d09555eed0..d4e9a1c3a5e4 100644
--- a/pkg/kv/kvserver/replica_stats.go
+++ b/pkg/kv/kvserver/replica_stats.go
@@ -15,6 +15,7 @@ import (
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -31,6 +32,29 @@ const (
MinStatsDuration = 5 * time.Second
)
+// AddSSTableRequestSizeFactor wraps
+// "kv.replica_stats.addsst_request_size_factor". When this setting is set to
+// 0, all batch requests are treated uniformly as 1 QPS. When this setting is
+// greater than or equal to 1, AddSSTable requests will add additional QPS,
+// when present within a batch request. The additional QPS is size of the
+// SSTable data, divided by this factor. Thereby, the magnitude of this factor
+// is inversely related to QPS sensitivity to AddSSTableRequests.
+var AddSSTableRequestSizeFactor = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "kv.replica_stats.addsst_request_size_factor",
+ "the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1",
+ // The default value of 50,000 was chosen as the default divisor, following manual testing that
+ // is discussed in this pull request: #76252. Every additional 50,000 AddSSTable bytes will
+ // increase accounted QPS by 1. Typically AddSSTableRequests are ~1mb in size, accounted as 20
+ // QPS.
+ 50000,
+).WithPublic()
+
+// AddSSTableRequestSizeFactor returns the size cost factor for a given replica.
+func (r *Replica) AddSSTableRequestSizeFactor() int64 {
+ return AddSSTableRequestSizeFactor.Get(&r.store.cfg.Settings.SV)
+}
+
type localityOracle func(roachpb.NodeID) string
// perLocalityCounts maps from the string representation of a locality to count.
@@ -105,10 +129,6 @@ func (rs *replicaStats) splitRequestCounts(other *replicaStats) {
}
}
-func (rs *replicaStats) record(nodeID roachpb.NodeID) {
- rs.recordCount(1, nodeID)
-}
-
func (rs *replicaStats) recordCount(count float64, nodeID roachpb.NodeID) {
var locality string
if rs.getNodeLocality != nil {
diff --git a/pkg/kv/kvserver/replica_stats_test.go b/pkg/kv/kvserver/replica_stats_test.go
index 364680d23a9a..f8f7c614f547 100644
--- a/pkg/kv/kvserver/replica_stats_test.go
+++ b/pkg/kv/kvserver/replica_stats_test.go
@@ -174,7 +174,7 @@ func TestReplicaStats(t *testing.T) {
return tc.localities[nodeID]
})
for _, req := range tc.reqs {
- rs.record(req)
+ rs.recordCount(1, req)
}
manual.Increment(int64(time.Second))
if actual, _ := rs.perLocalityDecayingQPS(); !floatMapsEqual(tc.expected, actual) {
@@ -240,7 +240,7 @@ func TestReplicaStatsDecay(t *testing.T) {
{
for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} {
- rs.record(req)
+ rs.recordCount(1, req)
}
counts := perLocalityCounts{
awsLocalities[1]: 2,
@@ -285,11 +285,11 @@ func TestReplicaStatsDecay(t *testing.T) {
{
for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} {
- rs.record(req)
+ rs.recordCount(1, req)
}
manual.Increment(int64(replStatsRotateInterval))
for _, req := range []roachpb.NodeID{2, 2, 3, 3, 3} {
- rs.record(req)
+ rs.recordCount(1, req)
}
durationDivisor := time.Duration(float64(replStatsRotateInterval) * decayFactor).Seconds()
expected := perLocalityCounts{
@@ -322,11 +322,11 @@ func TestReplicaStatsDecaySmoothing(t *testing.T) {
rs := newReplicaStats(clock, func(nodeID roachpb.NodeID) string {
return awsLocalities[nodeID]
})
- rs.record(1)
- rs.record(1)
- rs.record(2)
- rs.record(2)
- rs.record(3)
+ rs.recordCount(1, 1)
+ rs.recordCount(1, 1)
+ rs.recordCount(1, 2)
+ rs.recordCount(1, 2)
+ rs.recordCount(1, 3)
expected := perLocalityCounts{
awsLocalities[1]: 2,
awsLocalities[2]: 2,
diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go
index 78354ab253d4..70ebdd307c9d 100644
--- a/pkg/kv/kvserver/store_pool_test.go
+++ b/pkg/kv/kvserver/store_pool_test.go
@@ -547,7 +547,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
replica.mu.Unlock()
rs := newReplicaStats(clock, nil)
for _, store := range stores {
- rs.record(store.Node.NodeID)
+ rs.recordCount(1, store.Node.NodeID)
}
manual.Increment(int64(MinStatsDuration + time.Second))
replica.leaseholderStats = rs