Skip to content

Commit

Permalink
Merge #91462 #92231
Browse files Browse the repository at this point in the history
91462: kvserver: Fix performance regression due to new call to collectSpansRead r=KaiSun314 a=KaiSun314

Fixes: #91374
Fixes: #91723

When we incorporated the use of response data in the load-based splitter, we called collectSpansRead, which is allocation heavy and computationally expensive, resulting in a performance regression.

To address this, this patch performs 3 optimizations:
1. Remove the call to collectSpansRead; instead, add a custom function to iterate over the batch of requests / responses and calculate the true spans
2. Instead of constructing a *spanset.SpanSet and finding the union of spans (which uses O(batch_size) memory), we directly compute the union of spans while iterating over the batch resulting in only O(1) memory used
3. Lazily compute the union of true spans only when it is truly needed i.e. we are under heavy load (e.g. >2500QPS) and a load-based splitter has been initialized

Cherry-picking this commit to the commit right before we incorporated response data in the load-based splitter (068845f) and running
```
~/benchdiff/benchdiff --old=068845ff72315f8b64f0e930c17c48f078203bc4 --new=abf61ce75c47e16bc39ed0e714f2e46f1d97eb7c --count=20 --post-checkout='./dev generate go' --run='KV/././rows=1$$' ./pkg/sql/tests
```
the output is:
<img width="909" alt="More Efficient Response Data Benchdiff" src="https://user-images.githubusercontent.com/28762332/200678800-d75240f9-a275-40cf-85f2-201aadca0355.png">

Release note: None

92231: cli,democluster: defer simulated latency until after cluster setup r=ajwerner a=ajwerner

### democluster,serverutils/regionlatency,rpc: extract code for simulating latency

We'll want to leverage these helpers in some tests to measure behavior under
simulated latency.

### cli,democluster: defer simulated latency until after cluster setup

Cluster creation and tenant setup is chatty. That's an okay thing: we don't
really care about cluster creation being that slow in general. In the case of
demo when we want to simulate latency and use tenants, it was particularly
painful. Starting the 9 tenants would take many minutes. This patch alleviates
this problem by keeping latency between the simulated nodes low until just
before we pass control to the user.

Fixes #76305

Release note (cli change): `cockroach demo --global` will now start up more
quickly. The latency which will be injected is not injected until after the
initial cluster is set up internally.

Co-authored-by: Kai Sun <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Nov 23, 2022
3 parents b60baa6 + ca76c28 + 137b27b commit 0d9669a
Show file tree
Hide file tree
Showing 18 changed files with 691 additions and 115 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,7 @@ GO_TARGETS = [
"//pkg/testutils/metrictestutils:metrictestutils",
"//pkg/testutils/pgtest:pgtest",
"//pkg/testutils/physicalplanutils:physicalplanutils",
"//pkg/testutils/serverutils/regionlatency:regionlatency",
"//pkg/testutils/serverutils:serverutils",
"//pkg/testutils/skip:skip",
"//pkg/testutils/sqlutils:sqlutils",
Expand Down Expand Up @@ -2955,6 +2956,7 @@ GET_X_DATA_TARGETS = [
"//pkg/testutils/pgtest:get_x_data",
"//pkg/testutils/physicalplanutils:get_x_data",
"//pkg/testutils/serverutils:get_x_data",
"//pkg/testutils/serverutils/regionlatency:get_x_data",
"//pkg/testutils/skip:get_x_data",
"//pkg/testutils/sqlutils:get_x_data",
"//pkg/testutils/storageutils:get_x_data",
Expand Down
7 changes: 7 additions & 0 deletions pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,5 +360,12 @@ func runDemoInternal(
return err
}

// Enable the latency as late in the process of starting the cluster as we
// can to minimize the startup time.
if demoCtx.SimulateLatency {
c.SetSimulatedLatency(true /* on */)
defer c.SetSimulatedLatency(false /* on */)
}

return sqlCtx.Run(ctx, conn)
}
4 changes: 3 additions & 1 deletion pkg/cli/democluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"demo_cluster.go",
"demo_locality_list.go",
"doc.go",
"region_latencies.go",
"socket_unix.go",
"socket_windows.go",
],
Expand Down Expand Up @@ -37,13 +36,15 @@ go_library(
"//pkg/sql/distsql",
"//pkg/sql/sem/catconstants",
"//pkg/testutils/serverutils",
"//pkg/testutils/serverutils/regionlatency",
"//pkg/util/humanizeutil",
"//pkg/util/log/logcrash",
"//pkg/util/log/logpb",
"//pkg/util/log/severity",
"//pkg/util/netutil/addr",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/workload",
"//pkg/workload/histogram",
"//pkg/workload/workloadsql",
Expand All @@ -68,6 +69,7 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils/regionlatency",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/democluster/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type DemoCluster interface {
// SetClusterSetting overrides a default cluster setting at system level
// and for all tenants.
SetClusterSetting(ctx context.Context, setting string, value interface{}) error

// SetSimulatedLatency is used to enable or disable simulated latency.
SetSimulatedLatency(on bool)
}

// EnableEnterprise is not implemented here in order to keep OSS/BSL builds successful.
Expand Down
69 changes: 39 additions & 30 deletions pkg/cli/democluster/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
Expand Down Expand Up @@ -83,6 +85,10 @@ type transientCluster struct {
infoLog LoggerFn
warnLog LoggerFn
shoutLog ShoutLoggerFn

// latencyEnabled controls whether simulated latency is currently enabled.
// It is only relevant when using SimulateLatency.
latencyEnabled syncutil.AtomicBool
}

// maxNodeInitTime is the maximum amount of time to wait for nodes to
Expand Down Expand Up @@ -315,34 +321,13 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
phaseCtx = logtags.AddTag(ctx, "phase", 5)
if err := func(ctx context.Context) error {
// If latency simulation is requested, initialize the latency map.
if c.demoCtx.SimulateLatency {
// Now, all servers have been started enough to know their own RPC serving
// addresses, but nothing else. Assemble the artificial latency map.
c.infoLog(ctx, "initializing latency map")
for i, serv := range c.servers {
latencyMap := serv.Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
srcLocality, ok := serv.Cfg.Locality.Find("region")
if !ok {
continue
}
srcLocalityMap, ok := regionToRegionToLatency[srcLocality]
if !ok {
continue
}
for j, dst := range c.servers {
if i == j {
continue
}
dstLocality, ok := dst.Cfg.Locality.Find("region")
if !ok {
continue
}
latency := srcLocalityMap[dstLocality]
latencyMap[dst.ServingRPCAddr()] = latency
}
}
if !c.demoCtx.SimulateLatency {
return nil
}
return nil
// Now, all servers have been started enough to know their own RPC serving
// addresses, but nothing else. Assemble the artificial latency map.
c.infoLog(ctx, "initializing latency map")
return localityLatencies.Apply(c)
}(phaseCtx); err != nil {
return err
}
Expand Down Expand Up @@ -389,7 +374,8 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {

c.tenantServers = make([]serverutils.TestTenantInterface, c.demoCtx.NumNodes)
for i := 0; i < c.demoCtx.NumNodes; i++ {
latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).
ContextTestingKnobs.InjectedLatencyOracle
c.infoLog(ctx, "starting tenant node %d", i)
tenantStopper := stop.NewStopper()
tenID := uint64(i + 2)
Expand All @@ -409,7 +395,8 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
TestingKnobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ContextTestingKnobs: rpc.ContextTestingKnobs{
ArtificialLatencyMap: latencyMap,
InjectedLatencyOracle: latencyMap,
InjectedLatencyEnabled: c.latencyEnabled.Get,
},
},
},
Expand Down Expand Up @@ -504,6 +491,19 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
return nil
}

// SetSimulatedLatency enables or disable the simulated latency and then
// clears the remote clock tracking. If the remote clocks were not cleared,
// bad routing decisions would be made as soon as latency is turned on.
func (c *transientCluster) SetSimulatedLatency(on bool) {
c.latencyEnabled.Set(on)
for _, s := range c.servers {
s.RPCContext().RemoteClocks.TestingResetLatencyInfos()
}
for _, s := range c.tenantServers {
s.RPCContext().RemoteClocks.TestingResetLatencyInfos()
}
}

// createAndAddNode is responsible for determining node parameters,
// instantiating the server component and connecting it to the
// cluster's stopper.
Expand Down Expand Up @@ -551,7 +551,8 @@ func (c *transientCluster) createAndAddNode(
// started listening on RPC, and before they proceed with their
// startup routine.
serverKnobs.ContextTestingKnobs = rpc.ContextTestingKnobs{
ArtificialLatencyMap: make(map[string]int),
InjectedLatencyOracle: regionlatency.MakeAddrMap(),
InjectedLatencyEnabled: c.latencyEnabled.Get,
}
}

Expand Down Expand Up @@ -1418,6 +1419,14 @@ func (c *transientCluster) NumNodes() int {
return len(c.servers)
}

func (c *transientCluster) NumServers() int {
return len(c.servers)
}

func (c *transientCluster) Server(i int) serverutils.TestServerInterface {
return c.servers[i]
}

func (c *transientCluster) GetLocality(nodeID int32) string {
return c.demoCtx.Localities[nodeID-1].String()
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/cli/democluster/demo_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -146,6 +147,7 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {
// Set up an empty 9-node cluster with simulated latencies.
demoCtx.SimulateLatency = true
demoCtx.NumNodes = 9
demoCtx.Localities = defaultLocalities

certsDir := t.TempDir()

Expand Down Expand Up @@ -179,6 +181,8 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {

require.NoError(t, c.Start(ctx))

c.SetSimulatedLatency(true)

for _, tc := range []struct {
desc string
nodeIdx int
Expand Down Expand Up @@ -212,12 +216,14 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {
}
}()
// Find the maximum latency in the cluster from the current node.
var maxLatency time.Duration
for _, latencyMS := range regionToRegionToLatency[tc.region] {
if d := time.Duration(latencyMS) * time.Millisecond; d > maxLatency {
maxLatency = d
var maxLatency regionlatency.RoundTripLatency
localityLatencies.ForEachLatencyFrom(tc.region, func(
_ regionlatency.Region, l regionlatency.OneWayLatency,
) {
if rtt := l * 2; rtt > maxLatency {
maxLatency = rtt
}
}
})

// Attempt to make a query that talks to every node.
// This should take at least maxLatency.
Expand Down
9 changes: 9 additions & 0 deletions pkg/cli/democluster/demo_locality_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package democluster

import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency"
)

// DemoLocalityList represents a list of localities for the cockroach
Expand Down Expand Up @@ -60,3 +62,10 @@ var defaultLocalities = DemoLocalityList{
{Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}, {Key: "az", Value: "c"}}},
{Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}, {Key: "az", Value: "d"}}},
}

// Round-trip latencies collected from http://cloudping.co on 2019-09-11.
var localityLatencies = regionlatency.RoundTripPairs{
{A: "us-east1", B: "us-west1"}: 66 * time.Millisecond,
{A: "us-east1", B: "europe-west1"}: 64 * time.Millisecond,
{A: "us-west1", B: "europe-west1"}: 146 * time.Millisecond,
}.ToLatencyMap()
52 changes: 0 additions & 52 deletions pkg/cli/democluster/region_latencies.go

This file was deleted.

1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ go_test(
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_sideload_test.go",
"replica_split_load_test.go",
"replica_sst_snapshot_storage_test.go",
"replica_test.go",
"replica_tscache_test.go",
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,14 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
var g *concurrency.Guard
defer func() {
// Handle load-based splitting, if necessary.
if pErr == nil {
spansRead, _, _ := r.collectSpansRead(ba, br)
r.recordBatchForLoadBasedSplitting(ctx, ba, spansRead)
if pErr == nil && br != nil {
if len(ba.Requests) != len(br.Responses) {
log.KvDistribution.Errorf(ctx,
"Requests and responses should be equal lengths: # of requests = %d, # of responses = %d",
len(ba.Requests), len(br.Responses))
} else {
r.recordBatchForLoadBasedSplitting(ctx, ba, br)
}
}

// NB: wrapped to delay g evaluation to its value when returning.
Expand Down
Loading

0 comments on commit 0d9669a

Please sign in to comment.