Skip to content

Commit

Permalink
kvserver: account for AddSSTableRequests in QPS
Browse files Browse the repository at this point in the history
Previously, Queries-Per-Second (QPS) was calculated uniformly per
`BatchRequest` as 1. This patch introduces variable QPS calculation
for `AddSSTableRequest`, which use an order of magnitude more
resources than other request types.

This patch introduces the
`kv.replica_stats.addsst_request_size_factor` cluster setting. This
setting is used to attribute QPS to `AddSSTableRequest` sizes. The
calculation is done as QPS = 1 + size(AddSSTableRequest) / factor.
When `kv.replica_stats.addsst_request_size_factor` is less than 1, or
no `AddSSTableRequest` exists within a `BatchRequest`, then QPS = 1;
the current behavior today.

resolves #7371

Release note (performance improvement):
Introduced `kv.replica_stats.addsst_request_size_factor`
cluster setting. This setting is used to tune
Queries-Per-Second (QPS) sensitivity to large imports.
By default, this setting is disabled. When enabled,
the size of any AddSSTableRequest will contribute to
QPS in inverse relation to this settings magnitude.
Experimentally, 20,000 has produced the best load balancing results.
  • Loading branch information
kvoli committed Feb 11, 2022
1 parent c055ce1 commit bd576c8
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.replica_stats.addsst_request_size_factor integer 0 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<tr><td><code>kv.range_split.load_qps_threshold</code></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.replica_circuit_breaker.slow_replication_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td></tr>
<tr><td><code>kv.replica_stats.addsst_request_size_factor</code></td><td>integer</td><td><code>0</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td></tr>
<tr><td><code>kv.replication_reports.interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5108,17 +5108,17 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
// the unknown node 99 in evenlyBalanced to verify that requests from
// unknown localities don't affect the algorithm.
evenlyBalanced := newReplicaStats(clock, localityFn)
evenlyBalanced.record(1)
evenlyBalanced.record(2)
evenlyBalanced.record(3)
evenlyBalanced.recordCount(1, 1)
evenlyBalanced.recordCount(1, 2)
evenlyBalanced.recordCount(1, 3)
imbalanced1 := newReplicaStats(clock, localityFn)
imbalanced2 := newReplicaStats(clock, localityFn)
imbalanced3 := newReplicaStats(clock, localityFn)
for i := 0; i < 100*int(MinLeaseTransferStatsDuration.Seconds()); i++ {
evenlyBalanced.record(99)
imbalanced1.record(1)
imbalanced2.record(2)
imbalanced3.record(3)
evenlyBalanced.recordCount(1, 99)
imbalanced1.recordCount(1, 1)
imbalanced2.recordCount(1, 2)
imbalanced3.recordCount(1, 3)
}

manual.Increment(int64(MinLeaseTransferStatsDuration))
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,16 +623,16 @@ func TestEvalAddSSTable(t *testing.T) {
expectStatsEst: true,
},
/* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will
* shortly be removed anyway.
"SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
atReqTS: 8,
data: []sstutil.KV{{"a", 6, "a6"}},
sst: []sstutil.KV{{"a", 7, "a7"}},
sstTimestamp: 8,
expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
expectStatsEst: true,
},*/
* shortly be removed anyway.
"SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
atReqTS: 8,
data: []sstutil.KV{{"a", 6, "a6"}},
sst: []sstutil.KV{{"a", 7, "a7"}},
sstTimestamp: 8,
expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
expectStatsEst: true,
},*/
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
for name, tc := range testcases {
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ func calcBehindCount(
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding requests
// that weren't sent through a DistSender, which in practice should be
// practically none). Also return the amount of time over which the stat was
// accumulated.
// practically none).
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.avgQPS()
}
Expand Down
95 changes: 95 additions & 0 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@
package kvserver

import (
"context"
"fmt"
"math/rand"
"reflect"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sstutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestReplicaRankings(t *testing.T) {
Expand Down Expand Up @@ -73,3 +80,91 @@ func TestReplicaRankings(t *testing.T) {
}
}
}

// TestAddSSTQPSStat verifies that AddSSTableRequests are accounted for
// differently, when present in a BatchRequest, with a divisor set.
func TestAddSSTQPSStat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
db := ts.DB()
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(conn)

// Construct an sst with 100 keys that will be reused with different divisors.
sstKeys := make([]sstutil.KV, 100)
for i := range sstKeys {
sstKeys[i] = sstutil.KV{fmt.Sprintf("%3d", i+1), 1, "value"}
}
sst, start, end := sstutil.MakeSST(t, sstKeys)
requestSize := float64(len(sst))

sstReq := &roachpb.AddSSTableRequest{
RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end},
Data: sst,
MVCCStats: sstutil.ComputeStats(t, sst),
}

get := &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("001")},
}

addSSTBA := roachpb.BatchRequest{}
nonSSTBA := roachpb.BatchRequest{}
addSSTBA.Add(sstReq)
nonSSTBA.Add(get)

// When the factor is set to 0, it is disabled and we expect uniform 1 QPS.
// In all other cases, we expect 1 + the size of a
// AddSSTableRequest/factor. If no AddSStableRequest exists within the
// request, it should be cost 1, regardless of factor.
testCases := []struct {
addsstRequestFactor int
expectedQPS float64
ba roachpb.BatchRequest
}{
{0, 1, addSSTBA},
{100, 1, nonSSTBA},
{1, 1 + requestSize, addSSTBA},
{10, 1 + requestSize/10, addSSTBA},
{50, 1 + requestSize/50, addSSTBA},
{100, 1 + requestSize/100, addSSTBA},
}

// Send an AddSSTRequest once to create the key range.
_, pErr := db.NonTransactionalSender().Send(ctx, addSSTBA)
require.Nil(t, pErr)

store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
require.NoError(t, err)

repl := store.LookupReplica(roachpb.RKey(start))
require.NotNil(t, repl)

for _, testCase := range testCases {
sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))

qpsBefore, durationBefore := repl.QueriesPerSecond()
queriesBefore := qpsBefore * durationBefore.Seconds()

_, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
require.Nil(t, pErr)

qpsAfter, durationAfter := repl.QueriesPerSecond()
queriesAfter := qpsAfter * durationAfter.Seconds()
queriesIncrease := queriesAfter - queriesBefore

// If queries are correctly recorded, we should see increase in query count by the expected
// QPS. However, it is possible to to get a slightly higher or lower number, due to rounding
// or timing errors; the result is calculated using floating point division. We allow a
// tolerance of 4 to avoid flaking the test due to this inaccuracy.
require.InDelta(t, queriesIncrease, testCase.expectedQPS, 4)
}
}
33 changes: 32 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ func (r *Replica) sendWithoutRangeID(
ctx context.Context, ba *roachpb.BatchRequest,
) (_ *roachpb.BatchResponse, rErr *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.record(ba.Header.GatewayNodeID)
r.leaseholderStats.recordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}

// Add the range log tag.
Expand Down Expand Up @@ -966,6 +967,36 @@ func (r *Replica) executeAdminBatch(
return br, nil
}

// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
// estimate returns Queries Per Second (QPS), representing the abstract
// resource cost associated with this request. BatchRequests are calculated as
// 1 QPS, unless an AddSSTableRequest exists, in which case the sum of all
// AddSSTableRequest's data size is divided by a factor and added to QPS. This
// specific treatment of QPS is a special case to account for the mismatch
// between AddSSTableRequest and other requests in terms of resource use.
func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchRequest) float64 {
var count float64 = 1

// For divisors less than 1, use the default treatment of QPS.
requestFact := r.AddSSTableRequestSizeFactor()
if requestFact < 1 {
return count
}

var addSSTSize int64 = 0
for _, req := range ba.Requests {
switch t := req.GetInner().(type) {
case *roachpb.AddSSTableRequest:
addSSTSize += int64(len(t.Data))
default:
continue
}
}

count += float64(addSSTSize / requestFact)
return count
}

// checkBatchRequest verifies BatchRequest validity requirements. In particular,
// the batch must have an assigned timestamp, and either all requests must be
// read-only, or none.
Expand Down
28 changes: 24 additions & 4 deletions pkg/kv/kvserver/replica_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -31,6 +32,29 @@ const (
MinStatsDuration = 5 * time.Second
)

// AddSSTableRequestSizeFactor wraps
// "kv.replica_stats.addsst_request_size_factor". When this setting is set to
// 0, all batch requests are treated uniformly as 1 QPS. When this setting is
// greater than or equal to 1, AddSSTable requests will add additional QPS,
// when present within a batch request. The additional QPS is size of the
// SSTable data, divided by this factor. Thereby, the magnitude of this factor
// is inversely related to QPS sensitivity to AddSSTableRequests.
var AddSSTableRequestSizeFactor = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.replica_stats.addsst_request_size_factor",
"the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1",
// The default value of 50,000 was chosen as the default divisor, following manual testing that
// is discussed in this pull request: #76252. Every additional 50,000 AddSSTable bytes will
// increase accounted QPS by 1. Typically AddSSTableRequests are ~1mb in size, accounted as 20
// QPS.
50000,
).WithPublic()

// AddSSTableRequestSizeFactor returns the size cost factor for a given replica.
func (r *Replica) AddSSTableRequestSizeFactor() int64 {
return AddSSTableRequestSizeFactor.Get(&r.store.cfg.Settings.SV)
}

type localityOracle func(roachpb.NodeID) string

// perLocalityCounts maps from the string representation of a locality to count.
Expand Down Expand Up @@ -105,10 +129,6 @@ func (rs *replicaStats) splitRequestCounts(other *replicaStats) {
}
}

func (rs *replicaStats) record(nodeID roachpb.NodeID) {
rs.recordCount(1, nodeID)
}

func (rs *replicaStats) recordCount(count float64, nodeID roachpb.NodeID) {
var locality string
if rs.getNodeLocality != nil {
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/replica_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestReplicaStats(t *testing.T) {
return tc.localities[nodeID]
})
for _, req := range tc.reqs {
rs.record(req)
rs.recordCount(1, req)
}
manual.Increment(int64(time.Second))
if actual, _ := rs.perLocalityDecayingQPS(); !floatMapsEqual(tc.expected, actual) {
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestReplicaStatsDecay(t *testing.T) {

{
for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} {
rs.record(req)
rs.recordCount(1, req)
}
counts := perLocalityCounts{
awsLocalities[1]: 2,
Expand Down Expand Up @@ -285,11 +285,11 @@ func TestReplicaStatsDecay(t *testing.T) {

{
for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} {
rs.record(req)
rs.recordCount(1, req)
}
manual.Increment(int64(replStatsRotateInterval))
for _, req := range []roachpb.NodeID{2, 2, 3, 3, 3} {
rs.record(req)
rs.recordCount(1, req)
}
durationDivisor := time.Duration(float64(replStatsRotateInterval) * decayFactor).Seconds()
expected := perLocalityCounts{
Expand Down Expand Up @@ -322,11 +322,11 @@ func TestReplicaStatsDecaySmoothing(t *testing.T) {
rs := newReplicaStats(clock, func(nodeID roachpb.NodeID) string {
return awsLocalities[nodeID]
})
rs.record(1)
rs.record(1)
rs.record(2)
rs.record(2)
rs.record(3)
rs.recordCount(1, 1)
rs.recordCount(1, 1)
rs.recordCount(1, 2)
rs.recordCount(1, 2)
rs.recordCount(1, 3)
expected := perLocalityCounts{
awsLocalities[1]: 2,
awsLocalities[2]: 2,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
replica.mu.Unlock()
rs := newReplicaStats(clock, nil)
for _, store := range stores {
rs.record(store.Node.NodeID)
rs.recordCount(1, store.Node.NodeID)
}
manual.Increment(int64(MinStatsDuration + time.Second))
replica.leaseholderStats = rs
Expand Down

0 comments on commit bd576c8

Please sign in to comment.