Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: account for AddSSTableRequests in QPS #76252

Merged
merged 2 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<tr><td><code>kv.range_split.load_qps_threshold</code></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.replica_circuit_breaker.slow_replication_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td></tr>
<tr><td><code>kv.replica_stats.addsst_request_size_factor</code></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td></tr>
<tr><td><code>kv.replication_reports.interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5110,17 +5110,17 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
// the unknown node 99 in evenlyBalanced to verify that requests from
// unknown localities don't affect the algorithm.
evenlyBalanced := newReplicaStats(clock, localityFn)
evenlyBalanced.record(1)
evenlyBalanced.record(2)
evenlyBalanced.record(3)
evenlyBalanced.recordCount(1, 1)
evenlyBalanced.recordCount(1, 2)
evenlyBalanced.recordCount(1, 3)
imbalanced1 := newReplicaStats(clock, localityFn)
imbalanced2 := newReplicaStats(clock, localityFn)
imbalanced3 := newReplicaStats(clock, localityFn)
for i := 0; i < 100*int(MinLeaseTransferStatsDuration.Seconds()); i++ {
evenlyBalanced.record(99)
imbalanced1.record(1)
imbalanced2.record(2)
imbalanced3.record(3)
evenlyBalanced.recordCount(1, 99)
imbalanced1.recordCount(1, 1)
imbalanced2.recordCount(1, 2)
imbalanced3.recordCount(1, 3)
}

manual.Increment(int64(MinLeaseTransferStatsDuration))
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,16 +623,16 @@ func TestEvalAddSSTable(t *testing.T) {
expectStatsEst: true,
},
/* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will
* shortly be removed anyway.
"SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
atReqTS: 8,
data: []sstutil.KV{{"a", 6, "a6"}},
sst: []sstutil.KV{{"a", 7, "a7"}},
sstTimestamp: 8,
expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
expectStatsEst: true,
},*/
* shortly be removed anyway.
"SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": {
atReqTS: 8,
data: []sstutil.KV{{"a", 6, "a6"}},
sst: []sstutil.KV{{"a", 7, "a7"}},
sstTimestamp: 8,
expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}},
expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`,
expectStatsEst: true,
},*/
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
for name, tc := range testcases {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func calcBehindCount(
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding requests
// that weren't sent through a DistSender, which in practice should be
// practically none). Also return the amount of time over which the stat was
// accumulated.
// practically none). See Replica.getBatchRequestQPS() for how this is
// accounted for.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.avgQPS()
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@
package kvserver

import (
"context"
"fmt"
"math/rand"
"reflect"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sstutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestReplicaRankings(t *testing.T) {
Expand Down Expand Up @@ -73,3 +80,107 @@ func TestReplicaRankings(t *testing.T) {
}
}
}

// TestAddSSTQPSStat verifies that AddSSTableRequests are accounted for
// differently, when present in a BatchRequest, with a divisor set.
func TestAddSSTQPSStat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
db := ts.DB()
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(conn)

scratchKey := tc.ScratchRange(t)
nextKey := scratchKey.Next()

// Construct an sst with 200 keys that will be reused with different divisors.
sstKeys := make([]sstutil.KV, 200)
for i := range sstKeys {
sstKeys[i] = sstutil.KV{
KeyString: nextKey.String(),
WallTimestamp: 1,
ValueString: "value",
}
nextKey = nextKey.Next()
}
sst, start, end := sstutil.MakeSST(t, sstKeys)
requestSize := float64(len(sst))

sstReq := &roachpb.AddSSTableRequest{
RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end},
Data: sst,
MVCCStats: sstutil.ComputeStats(t, sst),
}

get := &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: start},
}

addSSTBA := roachpb.BatchRequest{}
nonSSTBA := roachpb.BatchRequest{}
addSSTBA.Add(sstReq)
nonSSTBA.Add(get)

// When the factor is set to 0, it is disabled and we expect uniform 1 QPS.
// In all other cases, we expect 1 + the size of a
// AddSSTableRequest/factor. If no AddSStableRequest exists within the
// request, it should be cost 1, regardless of factor.
testCases := []struct {
addsstRequestFactor int
expectedQPS float64
ba roachpb.BatchRequest
}{
{0, 1, addSSTBA},
{100, 1, nonSSTBA},
{10, 1 + requestSize/10, addSSTBA},
{20, 1 + requestSize/20, addSSTBA},
{40, 1 + requestSize/40, addSSTBA},
{100, 1 + requestSize/100, addSSTBA},
}

// Send an AddSSTRequest once to create the key range.
_, pErr := db.NonTransactionalSender().Send(ctx, addSSTBA)
kvoli marked this conversation as resolved.
Show resolved Hide resolved
require.Nil(t, pErr)

store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
require.NoError(t, err)

repl := store.LookupReplica(roachpb.RKey(start))
require.NotNil(t, repl)

// Disable the consistency checker, to avoid interleaving requests
// artificially inflating QPS due to consistency checking.
sqlDB.Exec(t, `SET CLUSTER SETTING server.consistency_check.interval = '0'`)

for _, testCase := range testCases {
sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor))

// Reset the request counts to 0 before sending to clear previous requests.
repl.leaseholderStats.resetRequestCounts()

_, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba)
require.Nil(t, pErr)

repl.leaseholderStats.mu.Lock()
queriesAfter, _ := repl.leaseholderStats.sumQueriesLocked()
repl.leaseholderStats.mu.Unlock()

// If queries are correctly recorded, we should see increase in query
// count by the expected QPS. However, it is possible to to get a
// slightly higher number due to interleaving requests. To avoid a
// flakey test, we assert that QPS is at least as high as expected,
// then no greater than 4 requests of expected QPS. If this test is
// flaky, increase the delta to account for background activity
// interleaving with measurements.
require.GreaterOrEqual(t, queriesAfter, testCase.expectedQPS)
require.InDelta(t, queriesAfter, testCase.expectedQPS, 4)
}
}
35 changes: 33 additions & 2 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var optimisticEvalLimitedScans = settings.RegisterBoolSetting(
// │ (handles leases and txn conflicts) │
// │ │
// ▼ │
// executeReadWriteBatch
// executeWriteBatch
// │ │
// ▼ ▼
// evalAndPropose (turns the BatchRequest executeReadOnlyBatch
Expand Down Expand Up @@ -122,8 +122,9 @@ func (r *Replica) sendWithoutRangeID(
ctx context.Context, ba *roachpb.BatchRequest,
) (_ *roachpb.BatchResponse, rErr *roachpb.Error) {
var br *roachpb.BatchResponse

if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
r.leaseholderStats.record(ba.Header.GatewayNodeID)
r.leaseholderStats.recordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID)
}

// Add the range log tag.
Expand Down Expand Up @@ -966,6 +967,36 @@ func (r *Replica) executeAdminBatch(
return br, nil
}

// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The
// estimate returns Queries Per Second (QPS), representing the abstract
// resource cost associated with this request. BatchRequests are calculated as
// 1 QPS, unless an AddSSTableRequest exists, in which case the sum of all
// AddSSTableRequest's data size is divided by a factor and added to QPS. This
// specific treatment of QPS is a special case to account for the mismatch
// between AddSSTableRequest and other requests in terms of resource use.
func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchRequest) float64 {
var count float64 = 1

// For divisors less than 1, use the default treatment of QPS.
requestFact := AddSSTableRequestSizeFactor.Get(&r.store.cfg.Settings.SV)
if requestFact < 1 {
return count
}

var addSSTSize float64 = 0
for _, req := range ba.Requests {
switch t := req.GetInner().(type) {
case *roachpb.AddSSTableRequest:
addSSTSize += float64(len(t.Data))
default:
continue
}
}

count += addSSTSize / float64(requestFact)
return count
}

// checkBatchRequest verifies BatchRequest validity requirements. In particular,
// the batch must have an assigned timestamp, and either all requests must be
// read-only, or none.
Expand Down
56 changes: 39 additions & 17 deletions pkg/kv/kvserver/replica_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -31,6 +32,24 @@ const (
MinStatsDuration = 5 * time.Second
)

// AddSSTableRequestSizeFactor wraps
// "kv.replica_stats.addsst_request_size_factor". When this setting is set to
// 0, all batch requests are treated uniformly as 1 QPS. When this setting is
// greater than or equal to 1, AddSSTable requests will add additional QPS,
// when present within a batch request. The additional QPS is size of the
// SSTable data, divided by this factor. Thereby, the magnitude of this factor
// is inversely related to QPS sensitivity to AddSSTableRequests.
var AddSSTableRequestSizeFactor = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.replica_stats.addsst_request_size_factor",
"the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1",
// The default value of 50,000 was chosen as the default divisor, following manual testing that
// is discussed in this pull request: #76252. Every additional 50,000 AddSSTable bytes will
// increase accounted QPS by 1. Typically AddSSTableRequests are ~1mb in size, accounted as 20
// QPS.
50000,
).WithPublic()

type localityOracle func(roachpb.NodeID) string

// perLocalityCounts maps from the string representation of a locality to count.
Expand Down Expand Up @@ -105,10 +124,6 @@ func (rs *replicaStats) splitRequestCounts(other *replicaStats) {
}
}

func (rs *replicaStats) record(nodeID roachpb.NodeID) {
rs.recordCount(1, nodeID)
}

func (rs *replicaStats) recordCount(count float64, nodeID roachpb.NodeID) {
var locality string
if rs.getNodeLocality != nil {
Expand Down Expand Up @@ -179,6 +194,25 @@ func (rs *replicaStats) perLocalityDecayingQPS() (perLocalityCounts, time.Durati
return counts, now.Sub(rs.mu.lastReset)
}

// sumQueriesLocked returns the sum of all queries currently recorded.
// Calling this method requires holding a lock on mu.
func (rs *replicaStats) sumQueriesLocked() (float64, int) {
var sum float64
var windowsUsed int
for i := range rs.mu.requests {
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests)
if cur := rs.mu.requests[requestsIdx]; cur != nil {
windowsUsed++
for _, v := range cur {
sum += v
}
}
}
return sum, windowsUsed
}

// avgQPS returns the average requests-per-second and the amount of time
// over which the stat was accumulated. Note that these averages are exact,
// not exponentially decayed (there isn't a ton of justification for going
Expand All @@ -196,19 +230,7 @@ func (rs *replicaStats) avgQPS() (float64, time.Duration) {
rs.maybeRotateLocked(now)

// First accumulate the counts, then divide by the total number of seconds.
var sum float64
var windowsUsed int
for i := range rs.mu.requests {
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests)
if cur := rs.mu.requests[requestsIdx]; cur != nil {
windowsUsed++
for _, v := range cur {
sum += v
}
}
}
sum, windowsUsed := rs.sumQueriesLocked()
if windowsUsed <= 0 {
return 0, 0
}
Expand Down
Loading