Skip to content

Commit

Permalink
Merge #71911
Browse files Browse the repository at this point in the history
71911: server: fix missing gateway node for tenant batches r=darinpp a=darinpp

Currently, when Node.Batch is called for a tenant, the BatchRequest's
GatewayNodeID is not set (and defaults to zero). This is to be expected
as the tenant processes don't have node ids assigned. The GatewayNodeID
however is used for one single purpose - as a proxy for the locality of
the originator of the request and as a basis for computing QPS per
locality for a range. When the gateway node id is not set - the range's
QPS doesn't get updated and as a result the QPS based rebalancing
doesn't work. This is bad as it prevents the distribution of the load
in the host cluster and severelly decreases the performance under load.

This PR sets the GatewayNodeID to be the node id of the host node that
gets the request from the tenant. This isn't ideal but works for the
current serverless setup. Ideally - the tenant processes need to
support and provide locality information that isn't tied to node ids.

Release note: None

Co-authored-by: Darin Peshev <[email protected]>
  • Loading branch information
craig[bot] and darinpp committed Oct 26, 2021
2 parents 7de965a + 9b223d4 commit 5f43585
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_test(
srcs = [
"connector_test.go",
"main_test.go",
"tenant_kv_test.go",
"tenant_trace_test.go",
"tenant_upgrade_test.go",
],
Expand Down
82 changes: 82 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package kvtenantccl

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestTenantRangeQPSStat verifies that queries on a tenant range are
// reflected in the range's QPS.
func TestTenantRangeQPSStat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableWebSessionAuthentication: true,
},
})
defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)

_, db := serverutils.StartTenant(
t, ts, base.TestTenantArgs{TenantID: serverutils.TestTenantID()},
)
defer db.Close()

// Tenant connection.
r := sqlutils.MakeSQLRunner(db)
r.Exec(t, `CREATE DATABASE foo`)
r.Exec(t, `CREATE TABLE foo.qps_test (k STRING PRIMARY KEY)`)
r.Exec(t, `INSERT INTO foo.qps_test VALUES('abc')`)

// Host connection.
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(conn)

var rangeID int
stmt := fmt.Sprintf(
"SELECT range_id FROM crdb_internal.ranges WHERE start_pretty='/Tenant/%s'",
serverutils.TestTenantID(),
)
sqlDB.QueryRow(t, stmt).Scan(&rangeID)
require.NotEqualf(t, 0, rangeID, "Unable to determine test table range id")

store, err := ts.GetStores().(*kvserver.Stores).GetStore(ts.GetFirstStoreID())
require.NoError(t, err)
repl, err := store.GetReplica(roachpb.RangeID(rangeID))
require.NoError(t, err)

qpsBefore, durationBefore := repl.QueriesPerSecond()
queriesBefore := qpsBefore * durationBefore.Seconds()
for i := 0; i < 110; i++ {
r.Exec(t, `SELECT k FROM foo.qps_test`)
}
qpsAfter, durationAfter := repl.QueriesPerSecond()
queriesAfter := qpsAfter * durationAfter.Seconds()
queriesIncrease := int(queriesAfter - queriesBefore)
// If queries are correctly recorded, we should see increase in query count by
// 110. As it is possible due to rounding and conversion from QPS to query count
// to get a slightly higher or lower number - we expect the increase to be at
// least 100.
require.Greater(t, queriesIncrease, 100)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func evalExport(

var evalExportTrace types.StringValue
if cArgs.EvalCtx.NodeID() == h.GatewayNodeID {
evalExportTrace.Value = fmt.Sprintf("evaluating Export on local node %d", cArgs.EvalCtx.NodeID())
evalExportTrace.Value = fmt.Sprintf("evaluating Export on gateway node %d", cArgs.EvalCtx.NodeID())
} else {
evalExportTrace.Value = fmt.Sprintf("evaluating Export on remote node %d", cArgs.EvalCtx.NodeID())
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
Expand Down Expand Up @@ -248,10 +249,10 @@ func calcBehindCount(
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding requests
// that weren't sent through a DistSender, which in practice should be
// practically none).
func (r *Replica) QueriesPerSecond() float64 {
qps, _ := r.leaseholderStats.avgQPS()
return qps
// practically none). Also return the amount of time over which the stat was
// accumulated.
func (r *Replica) QueriesPerSecond() (float64, time.Duration) {
return r.leaseholderStats.avgQPS()
}

// WritesPerSecond returns the range's average keys written per second. A
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (sq *splitQueue) processAttempt(

now := timeutil.Now()
if splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(now); splitByLoadKey != nil {
batchHandledQPS := r.QueriesPerSecond()
batchHandledQPS, _ := r.QueriesPerSecond()
raftAppliedQPS := r.WritesPerSecond()
splitQPS := r.loadBasedSplitter.LastQPS(now)
reason := fmt.Sprintf(
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,8 @@ message Header {
// RangeInfo with up-to-date information.
ClientRangeInfo client_range_info = 17 [(gogoproto.nullable) = false];
// gateway_node_id is the ID of the gateway node where the request originated.
// For requests from tenants, this is set to the NodeID of the KV node handling
// the BatchRequest.
int32 gateway_node_id = 11 [(gogoproto.customname) = "GatewayNodeID", (gogoproto.casttype) = "NodeID"];
// If set, the request will return to the client before proposing the
// request into Raft. All consensus processing will be performed
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,16 @@ func (n *Node) Batch(
if !ok {
tenantID = roachpb.SystemTenantID
}

// Requests from tenants don't have gateway node id set but are required for
// the QPS based rebalancing to work. The GatewayNodeID is used as a proxy
// for the locality of the origin of the request. The replica stats aggregate
// all incoming BatchRequests and which localities they come from in order to
// compute per second stats used for the rebalancing decisions.
if args.GatewayNodeID == 0 && tenantID != roachpb.SystemTenantID {
args.GatewayNodeID = n.Descriptor.NodeID
}

handle, err := n.admissionController.AdmitKVWork(ctx, tenantID, args)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,7 @@ func (s *statusServer) rangesHelper(
WaitingWriters: lm.WaitingWriters,
})
}
qps, _ := rep.QueriesPerSecond()
return serverpb.RangeInfo{
Span: span,
RaftState: raftState,
Expand All @@ -1913,7 +1914,7 @@ func (s *statusServer) rangesHelper(
SourceStoreID: storeID,
LeaseHistory: leaseHistory,
Stats: serverpb.RangeStatistics{
QueriesPerSecond: rep.QueriesPerSecond(),
QueriesPerSecond: qps,
WritesPerSecond: rep.WritesPerSecond(),
},
Problems: serverpb.RangeProblems{
Expand Down

0 comments on commit 5f43585

Please sign in to comment.