Skip to content

Commit

Permalink
kvserver: account for AddSSTableRequests in QPS
Browse files Browse the repository at this point in the history
Previously, Queries-Per-Second (QPS) was calculated uniformly per
`BatchRequest` as 1. This patch introduces variable QPS calculation
for `AddSSTableRequest`, which use an order of magnitude more
resources than other request types.

This patch introduces the
`kv.replica_stats.addsst_request_size_factor` cluster setting. This
setting is used to attribute QPS to `AddSSTableRequest` sizes. The
calculation is done as QPS = 1 + size(AddSSTableRequest) / factor.
When `kv.replica_stats.addsst_request_size_factor` is less than 1, or
no `AddSSTableRequest` exists within a `BatchRequest`, then QPS = 1;
the current behavior today.

resolves cockroachdb#73731

Release note (performance improvement):
Introduced `kv.replica_stats.addsst_request_size_factor` cluster
setting. This setting is used to tune Queries-Per-Second (QPS)
sensitivity to large imports. By default, this setting is disabled.
When enabled, the size of any AddSSTableRequest will contribute to
QPS in inverse relation to this settings magnitude. By default this
setting configured to a conservative 50,000; every 50 kilobytes will
be accounted for as an additional 1 QPS.
  • Loading branch information
kvoli authored and RajivTS committed Mar 6, 2022
1 parent b510360 commit 37b2c1e
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 48 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<tr><td><code>kv.range_split.load_qps_threshold</code></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.replica_circuit_breaker.slow_replication_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td></tr>
<tr><td><code>kv.replica_stats.addsst_request_size_factor</code></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td></tr>
<tr><td><code>kv.replication_reports.interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5110,17 +5110,17 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
// the unknown node 99 in evenlyBalanced to verify that requests from
// unknown localities don't affect the algorithm.
evenlyBalanced := newReplicaStats(clock, localityFn)
evenlyBalanced.record(1)
evenlyBalanced.record(2)
evenlyBalanced.record(3)
evenlyBalanced.recordCount(1, 1)
evenlyBalanced.recordCount(1, 2)
evenlyBalanced.recordCount(1, 3)
imbalanced1 := newReplicaStats(clock, localityFn)
imbalanced2 := newReplicaStats(clock, localityFn)
imbalanced3 := newReplicaStats(clock, localityFn)
for i := 0; i < 100*int(MinLeaseTransferStatsDuration.Seconds()); i++ {
evenlyBalanced.record(99)
imbalanced1.record(1)
imbalanced2.record(2)
imbalanced3.record(3)
evenlyBalanced.recordCount(1, 99)
imbalanced1.recordCount(1, 1)
imbalanced2.recordCount(1, 2)
imbalanced3.recordCount(1, 3)
}

manual.Increment(int64(MinLeaseTransferStatsDuration))
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,16 +623,16 @@ func TestEvalAddSSTable(t *testing.T) {
expectStatsEst: true,
},
/* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will
* shortly be removed anyway.
"SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
atReqTS: 8,
data: []sstutil.KV{{"a", 6, "a6"}},
sst: []sstutil.KV{{"a", 7, "a7"}},
sstTimestamp: 8,
expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
expectStatsEst: true,
},*/
* shortly be removed anyway.
"SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
atReqTS: 8,
data: []sstutil.KV{{"a", 6, "a6"}},
sst: []sstutil.KV{{"a", 7, "a7"}},
sstTimestamp: 8,
expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
expectStatsEst: true,
},*/
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
for name, tc := range testcases {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func calcBehindCount(
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding requests
// that weren't sent through a DistSender, which in practice should be
// practically none). Also return the amount of time over which the stat was
// accumulated.
// practically none). See Replica.getBatchRequestQPS() for how this is
// accounted for.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.avgQPS()
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@
package kvserver

import (
"context"
"fmt"
"math/rand"
"reflect"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sstutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestReplicaRankings(t *testing.T) {
Expand Down Expand Up @@ -73,3 +80,107 @@ func TestReplicaRankings(t *testing.T) {
}
}
}

// TestAddSSTQPSStat verifies that AddSSTableRequests are accounted for
// differently, when present in a BatchRequest, with a divisor set.
func TestAddSSTQPSStat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
db := ts.DB()
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(conn)

scratchKey := tc.ScratchRange(t)
nextKey := scratchKey.Next()

// Construct an sst with 200 keys that will be reused with different divisors.
sstKeys := make([]sstutil.KV, 200)
for i := range sstKeys {
sstKeys[i] = sstutil.KV{
KeyString: nextKey.String(),
WallTimestamp: 1,
ValueString: "value",
}
nextKey = nextKey.Next()
}
sst, start, end := sstutil.MakeSST(t, sstKeys)
requestSize := float64(len(sst))

sstReq := &roachpb.AddSSTableRequest{
RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end},
Data: sst,
MVCCStats: sstutil.ComputeStats(t, sst),
}

get := &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: start},
}

addSSTBA := roachpb.BatchRequest{}
nonSSTBA := roachpb.BatchRequest{}
addSSTBA.Add(sstReq)
nonSSTBA.Add(get)

// When the factor is set to 0, it is disabled and we expect uniform 1 QPS.
// In all other cases, we expect 1 + the size of a
// AddSSTableRequest/factor. If no AddSStableRequest exists within the
// request, it should be cost 1, regardless of factor.
testCases := []struct {
addsstRequestFactor int
expectedQPS float64
ba roachpb.BatchRequest
}{
{0, 1, addSSTBA},
{100, 1, nonSSTBA},
{10, 1 + requestSize/10, addSSTBA},
{20, 1 + requestSize/20, addSSTBA},
{40, 1 + requestSize/40, addSSTBA},
{100, 1 + requestSize/100, addSSTBA},
}

// Send an AddSSTRequest once to create the key range.
_, pErr := db.NonTransactionalSender().Send(ctx, addSSTBA)
require.Nil(t, pErr)

store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
require.NoError(t, err)

repl := store.LookupReplica(roachpb.RKey(start))
require.NotNil(t, repl)

// Disable the consistency checker, to avoid interleaving requests
// artificially inflating QPS due to consistency checking.
sqlDB.Exec(t, `SET CLUSTER SETTING server.consistency_check.interval = '0'`)

for _, testCase := range testCases {
sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))

// Reset the request counts to 0 before sending to clear previous requests.
repl.leaseholderStats.resetRequestCounts()

_, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
require.Nil(t, pErr)

repl.leaseholderStats.mu.Lock()
queriesAfter, _ := repl.leaseholderStats.sumQueriesLocked()
repl.leaseholderStats.mu.Unlock()

// If queries are correctly recorded, we should see increase in query
// count by the expected QPS. However, it is possible to to get a
// slightly higher number due to interleaving requests. To avoid a
// flakey test, we assert that QPS is at least as high as expected,
// then no greater than 4 requests of expected QPS. If this test is
// flaky, increase the delta to account for background activity
// interleaving with measurements.
require.GreaterOrEqual(t, queriesAfter, testCase.expectedQPS)
require.InDelta(t, queriesAfter, testCase.expectedQPS, 4)
}
}
33 changes: 32 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ func (r *Replica) sendWithoutRangeID(
ctx context.Context, ba *roachpb.BatchRequest,
) (_ *roachpb.BatchResponse, rErr *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.record(ba.Header.GatewayNodeID)
r.leaseholderStats.recordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}

// Add the range log tag.
Expand Down Expand Up @@ -966,6 +967,36 @@ func (r *Replica) executeAdminBatch(
return br, nil
}

// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
// estimate returns Queries Per Second (QPS), representing the abstract
// resource cost associated with this request. BatchRequests are calculated as
// 1 QPS, unless an AddSSTableRequest exists, in which case the sum of all
// AddSSTableRequest's data size is divided by a factor and added to QPS. This
// specific treatment of QPS is a special case to account for the mismatch
// between AddSSTableRequest and other requests in terms of resource use.
func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchRequest) float64 {
var count float64 = 1

// For divisors less than 1, use the default treatment of QPS.
requestFact := AddSSTableRequestSizeFactor.Get(&r.store.cfg.Settings.SV)
if requestFact < 1 {
return count
}

var addSSTSize float64 = 0
for _, req := range ba.Requests {
switch t := req.GetInner().(type) {
case *roachpb.AddSSTableRequest:
addSSTSize += float64(len(t.Data))
default:
continue
}
}

count += addSSTSize / float64(requestFact)
return count
}

// checkBatchRequest verifies BatchRequest validity requirements. In particular,
// the batch must have an assigned timestamp, and either all requests must be
// read-only, or none.
Expand Down
56 changes: 39 additions & 17 deletions pkg/kv/kvserver/replica_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -31,6 +32,24 @@ const (
MinStatsDuration = 5 * time.Second
)

// AddSSTableRequestSizeFactor wraps
// "kv.replica_stats.addsst_request_size_factor". When this setting is set to
// 0, all batch requests are treated uniformly as 1 QPS. When this setting is
// greater than or equal to 1, AddSSTable requests will add additional QPS,
// when present within a batch request. The additional QPS is size of the
// SSTable data, divided by this factor. Thereby, the magnitude of this factor
// is inversely related to QPS sensitivity to AddSSTableRequests.
var AddSSTableRequestSizeFactor = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.replica_stats.addsst_request_size_factor",
"the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1",
// The default value of 50,000 was chosen as the default divisor, following manual testing that
// is discussed in this pull request: #76252. Every additional 50,000 AddSSTable bytes will
// increase accounted QPS by 1. Typically AddSSTableRequests are ~1mb in size, accounted as 20
// QPS.
50000,
).WithPublic()

type localityOracle func(roachpb.NodeID) string

// perLocalityCounts maps from the string representation of a locality to count.
Expand Down Expand Up @@ -105,10 +124,6 @@ func (rs *replicaStats) splitRequestCounts(other *replicaStats) {
}
}

func (rs *replicaStats) record(nodeID roachpb.NodeID) {
rs.recordCount(1, nodeID)
}

func (rs *replicaStats) recordCount(count float64, nodeID roachpb.NodeID) {
var locality string
if rs.getNodeLocality != nil {
Expand Down Expand Up @@ -179,6 +194,25 @@ func (rs *replicaStats) perLocalityDecayingQPS() (perLocalityCounts, time.Durati
return counts, now.Sub(rs.mu.lastReset)
}

// sumQueriesLocked returns the sum of all queries currently recorded.
// Calling this method requires holding a lock on mu.
func (rs *replicaStats) sumQueriesLocked() (float64, int) {
var sum float64
var windowsUsed int
for i := range rs.mu.requests {
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests)
if cur := rs.mu.requests[requestsIdx]; cur != nil {
windowsUsed++
for _, v := range cur {
sum += v
}
}
}
return sum, windowsUsed
}

// avgQPS returns the average requests-per-second and the amount of time
// over which the stat was accumulated. Note that these averages are exact,
// not exponentially decayed (there isn't a ton of justification for going
Expand All @@ -196,19 +230,7 @@ func (rs *replicaStats) avgQPS() (float64, time.Duration) {
rs.maybeRotateLocked(now)

// First accumulate the counts, then divide by the total number of seconds.
var sum float64
var windowsUsed int
for i := range rs.mu.requests {
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests)
if cur := rs.mu.requests[requestsIdx]; cur != nil {
windowsUsed++
for _, v := range cur {
sum += v
}
}
}
sum, windowsUsed := rs.sumQueriesLocked()
if windowsUsed <= 0 {
return 0, 0
}
Expand Down
Loading

0 comments on commit 37b2c1e

Please sign in to comment.