From 9b223d4d0a668b5b80c5ca92fdb1718e10533e2a Mon Sep 17 00:00:00 2001 From: Darin Peshev Date: Sun, 24 Oct 2021 19:54:39 -0700 Subject: [PATCH] server: fix missing gateway node for tenant batches Currently, when Node.Batch is called for a tenant, the BatchRequest's GatewayNodeID is not set (and defaults to zero). This is to be expected as the tenant processes don't have node ids assigned. The GatewayNodeID however is used for one single purpose - as a proxy for the locality of the originator of the request and as a basis for computing QPS per locality for a range. When the gateway node id is not set - the range's QPS doesn't get updated and as a result the QPS based rebalancing doesn't work. This is bad as it prevents the distribution of the load in the host cluster and severelly decreases the performance under load. This PR sets the GatewayNodeID to be the node id of the host node that gets the request from the tenant. This isn't ideal but works for the current serverless setup. Ideally - the tenant processes need to support and provide locality information that isn't tied to node ids. Release note: None --- pkg/ccl/kvccl/kvtenantccl/BUILD.bazel | 1 + pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go | 82 +++++++++++++++++++++ pkg/kv/kvserver/batcheval/cmd_export.go | 2 +- pkg/kv/kvserver/replica_metrics.go | 9 ++- pkg/kv/kvserver/split_queue.go | 2 +- pkg/roachpb/api.pb.go | 2 + pkg/roachpb/api.proto | 2 + pkg/server/node.go | 10 +++ pkg/server/status.go | 3 +- 9 files changed, 106 insertions(+), 7 deletions(-) create mode 100644 pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go diff --git a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel index d8e777a1046f..038c4de90a44 100644 --- a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel @@ -37,6 +37,7 @@ go_test( srcs = [ "connector_test.go", "main_test.go", + "tenant_kv_test.go", "tenant_trace_test.go", "tenant_upgrade_test.go", ], diff --git a/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go b/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go new file mode 100644 index 000000000000..ced390c5c1e8 --- /dev/null +++ b/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go @@ -0,0 +1,82 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvtenantccl + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestTenantRangeQPSStat verifies that queries on a tenant range are +// reflected in the range's QPS. +func TestTenantRangeQPSStat(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + DisableWebSessionAuthentication: true, + }, + }) + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + _, db := serverutils.StartTenant( + t, ts, base.TestTenantArgs{TenantID: serverutils.TestTenantID()}, + ) + defer db.Close() + + // Tenant connection. + r := sqlutils.MakeSQLRunner(db) + r.Exec(t, `CREATE DATABASE foo`) + r.Exec(t, `CREATE TABLE foo.qps_test (k STRING PRIMARY KEY)`) + r.Exec(t, `INSERT INTO foo.qps_test VALUES('abc')`) + + // Host connection. + conn := tc.ServerConn(0) + sqlDB := sqlutils.MakeSQLRunner(conn) + + var rangeID int + stmt := fmt.Sprintf( + "SELECT range_id FROM crdb_internal.ranges WHERE start_pretty='/Tenant/%s'", + serverutils.TestTenantID(), + ) + sqlDB.QueryRow(t, stmt).Scan(&rangeID) + require.NotEqualf(t, 0, rangeID, "Unable to determine test table range id") + + store, err := ts.GetStores().(*kvserver.Stores).GetStore(ts.GetFirstStoreID()) + require.NoError(t, err) + repl, err := store.GetReplica(roachpb.RangeID(rangeID)) + require.NoError(t, err) + + qpsBefore, durationBefore := repl.QueriesPerSecond() + queriesBefore := qpsBefore * durationBefore.Seconds() + for i := 0; i < 110; i++ { + r.Exec(t, `SELECT k FROM foo.qps_test`) + } + qpsAfter, durationAfter := repl.QueriesPerSecond() + queriesAfter := qpsAfter * durationAfter.Seconds() + queriesIncrease := int(queriesAfter - queriesBefore) + // If queries are correctly recorded, we should see increase in query count by + // 110. As it is possible due to rounding and conversion from QPS to query count + // to get a slightly higher or lower number - we expect the increase to be at + // least 100. + require.Greater(t, queriesIncrease, 100) +} diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index b2a2160cd22b..17d838fce449 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -101,7 +101,7 @@ func evalExport( var evalExportTrace types.StringValue if cArgs.EvalCtx.NodeID() == h.GatewayNodeID { - evalExportTrace.Value = fmt.Sprintf("evaluating Export on local node %d", cArgs.EvalCtx.NodeID()) + evalExportTrace.Value = fmt.Sprintf("evaluating Export on gateway node %d", cArgs.EvalCtx.NodeID()) } else { evalExportTrace.Value = fmt.Sprintf("evaluating Export on remote node %d", cArgs.EvalCtx.NodeID()) } diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 70ee3143dbde..efbb986a7b18 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" @@ -248,10 +249,10 @@ func calcBehindCount( // A "Query" is a BatchRequest (regardless of its contents) arriving at the // leaseholder with a gateway node set in the header (i.e. excluding requests // that weren't sent through a DistSender, which in practice should be -// practically none). -func (r *Replica) QueriesPerSecond() float64 { - qps, _ := r.leaseholderStats.avgQPS() - return qps +// practically none). Also return the amount of time over which the stat was +// accumulated. +func (r *Replica) QueriesPerSecond() (float64, time.Duration) { + return r.leaseholderStats.avgQPS() } // WritesPerSecond returns the range's average keys written per second. A diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 63af8a63bef3..a611a335ac73 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -218,7 +218,7 @@ func (sq *splitQueue) processAttempt( now := timeutil.Now() if splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(now); splitByLoadKey != nil { - batchHandledQPS := r.QueriesPerSecond() + batchHandledQPS, _ := r.QueriesPerSecond() raftAppliedQPS := r.WritesPerSecond() splitQPS := r.loadBasedSplitter.LastQPS(now) reason := fmt.Sprintf( diff --git a/pkg/roachpb/api.pb.go b/pkg/roachpb/api.pb.go index 2b66f560fb66..82fb1c48886c 100644 --- a/pkg/roachpb/api.pb.go +++ b/pkg/roachpb/api.pb.go @@ -6681,6 +6681,8 @@ type Header struct { // RangeInfo with up-to-date information. ClientRangeInfo ClientRangeInfo `protobuf:"bytes,17,opt,name=client_range_info,json=clientRangeInfo,proto3" json:"client_range_info"` // gateway_node_id is the ID of the gateway node where the request originated. + // For requests from tenants, this is set to the NodeID of the KV node handling + // the BatchRequest. GatewayNodeID NodeID `protobuf:"varint,11,opt,name=gateway_node_id,json=gatewayNodeId,proto3,casttype=NodeID" json:"gateway_node_id,omitempty"` // If set, the request will return to the client before proposing the // request into Raft. All consensus processing will be performed diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 2219dfafc7a9..dc65313ce03c 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2162,6 +2162,8 @@ message Header { // RangeInfo with up-to-date information. ClientRangeInfo client_range_info = 17 [(gogoproto.nullable) = false]; // gateway_node_id is the ID of the gateway node where the request originated. + // For requests from tenants, this is set to the NodeID of the KV node handling + // the BatchRequest. int32 gateway_node_id = 11 [(gogoproto.customname) = "GatewayNodeID", (gogoproto.casttype) = "NodeID"]; // If set, the request will return to the client before proposing the // request into Raft. All consensus processing will be performed diff --git a/pkg/server/node.go b/pkg/server/node.go index 8a8714eec131..fcc05e71e5f8 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -955,6 +955,16 @@ func (n *Node) Batch( if !ok { tenantID = roachpb.SystemTenantID } + + // Requests from tenants don't have gateway node id set but are required for + // the QPS based rebalancing to work. The GatewayNodeID is used as a proxy + // for the locality of the origin of the request. The replica stats aggregate + // all incoming BatchRequests and which localities they come from in order to + // compute per second stats used for the rebalancing decisions. + if args.GatewayNodeID == 0 && tenantID != roachpb.SystemTenantID { + args.GatewayNodeID = n.Descriptor.NodeID + } + handle, err := n.admissionController.AdmitKVWork(ctx, tenantID, args) if err != nil { return nil, err diff --git a/pkg/server/status.go b/pkg/server/status.go index e9462809ae0f..94293f181250 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1905,6 +1905,7 @@ func (s *statusServer) rangesHelper( WaitingWriters: lm.WaitingWriters, }) } + qps, _ := rep.QueriesPerSecond() return serverpb.RangeInfo{ Span: span, RaftState: raftState, @@ -1913,7 +1914,7 @@ func (s *statusServer) rangesHelper( SourceStoreID: storeID, LeaseHistory: leaseHistory, Stats: serverpb.RangeStatistics{ - QueriesPerSecond: rep.QueriesPerSecond(), + QueriesPerSecond: qps, WritesPerSecond: rep.WritesPerSecond(), }, Problems: serverpb.RangeProblems{