diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 131ad578e7ce..d9b30903fef3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1899,6 +1899,7 @@ GO_TARGETS = [ "//pkg/testutils/metrictestutils:metrictestutils", "//pkg/testutils/pgtest:pgtest", "//pkg/testutils/physicalplanutils:physicalplanutils", + "//pkg/testutils/serverutils/regionlatency:regionlatency", "//pkg/testutils/serverutils:serverutils", "//pkg/testutils/skip:skip", "//pkg/testutils/sqlutils:sqlutils", @@ -2955,6 +2956,7 @@ GET_X_DATA_TARGETS = [ "//pkg/testutils/pgtest:get_x_data", "//pkg/testutils/physicalplanutils:get_x_data", "//pkg/testutils/serverutils:get_x_data", + "//pkg/testutils/serverutils/regionlatency:get_x_data", "//pkg/testutils/skip:get_x_data", "//pkg/testutils/sqlutils:get_x_data", "//pkg/testutils/storageutils:get_x_data", diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index 587e68e47a03..9ee992793be9 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -360,5 +360,12 @@ func runDemoInternal( return err } + // Enable the latency as late in the process of starting the cluster as we + // can to minimize the startup time. + if demoCtx.SimulateLatency { + c.SetSimulatedLatency(true /* on */) + defer c.SetSimulatedLatency(false /* on */) + } + return sqlCtx.Run(ctx, conn) } diff --git a/pkg/cli/democluster/BUILD.bazel b/pkg/cli/democluster/BUILD.bazel index 4653143399e2..7b46f3b4d996 100644 --- a/pkg/cli/democluster/BUILD.bazel +++ b/pkg/cli/democluster/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "demo_cluster.go", "demo_locality_list.go", "doc.go", - "region_latencies.go", "socket_unix.go", "socket_windows.go", ], @@ -37,6 +36,7 @@ go_library( "//pkg/sql/distsql", "//pkg/sql/sem/catconstants", "//pkg/testutils/serverutils", + "//pkg/testutils/serverutils/regionlatency", "//pkg/util/humanizeutil", "//pkg/util/log/logcrash", "//pkg/util/log/logpb", @@ -44,6 +44,7 @@ go_library( "//pkg/util/netutil/addr", "//pkg/util/retry", "//pkg/util/stop", + "//pkg/util/syncutil", "//pkg/workload", "//pkg/workload/histogram", "//pkg/workload/workloadsql", @@ -68,6 +69,7 @@ go_test( "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/testutils/serverutils/regionlatency", "//pkg/testutils/skip", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/cli/democluster/api.go b/pkg/cli/democluster/api.go index 8d8250553b33..084315bdc95b 100644 --- a/pkg/cli/democluster/api.go +++ b/pkg/cli/democluster/api.go @@ -53,6 +53,9 @@ type DemoCluster interface { // SetClusterSetting overrides a default cluster setting at system level // and for all tenants. SetClusterSetting(ctx context.Context, setting string, value interface{}) error + + // SetSimulatedLatency is used to enable or disable simulated latency. + SetSimulatedLatency(on bool) } // EnableEnterprise is not implemented here in order to keep OSS/BSL builds successful. diff --git a/pkg/cli/democluster/demo_cluster.go b/pkg/cli/democluster/demo_cluster.go index 73594aa85ace..cb89fbb38e05 100644 --- a/pkg/cli/democluster/demo_cluster.go +++ b/pkg/cli/democluster/demo_cluster.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" @@ -49,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/cockroach/pkg/workload/workloadsql" @@ -83,6 +85,10 @@ type transientCluster struct { infoLog LoggerFn warnLog LoggerFn shoutLog ShoutLoggerFn + + // latencyEnabled controls whether simulated latency is currently enabled. + // It is only relevant when using SimulateLatency. + latencyEnabled syncutil.AtomicBool } // maxNodeInitTime is the maximum amount of time to wait for nodes to @@ -315,34 +321,13 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { phaseCtx = logtags.AddTag(ctx, "phase", 5) if err := func(ctx context.Context) error { // If latency simulation is requested, initialize the latency map. - if c.demoCtx.SimulateLatency { - // Now, all servers have been started enough to know their own RPC serving - // addresses, but nothing else. Assemble the artificial latency map. - c.infoLog(ctx, "initializing latency map") - for i, serv := range c.servers { - latencyMap := serv.Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap - srcLocality, ok := serv.Cfg.Locality.Find("region") - if !ok { - continue - } - srcLocalityMap, ok := regionToRegionToLatency[srcLocality] - if !ok { - continue - } - for j, dst := range c.servers { - if i == j { - continue - } - dstLocality, ok := dst.Cfg.Locality.Find("region") - if !ok { - continue - } - latency := srcLocalityMap[dstLocality] - latencyMap[dst.ServingRPCAddr()] = latency - } - } + if !c.demoCtx.SimulateLatency { + return nil } - return nil + // Now, all servers have been started enough to know their own RPC serving + // addresses, but nothing else. Assemble the artificial latency map. + c.infoLog(ctx, "initializing latency map") + return localityLatencies.Apply(c) }(phaseCtx); err != nil { return err } @@ -389,7 +374,8 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { c.tenantServers = make([]serverutils.TestTenantInterface, c.demoCtx.NumNodes) for i := 0; i < c.demoCtx.NumNodes; i++ { - latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap + latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs). + ContextTestingKnobs.InjectedLatencyOracle c.infoLog(ctx, "starting tenant node %d", i) tenantStopper := stop.NewStopper() tenID := uint64(i + 2) @@ -409,7 +395,8 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { TestingKnobs: base.TestingKnobs{ Server: &server.TestingKnobs{ ContextTestingKnobs: rpc.ContextTestingKnobs{ - ArtificialLatencyMap: latencyMap, + InjectedLatencyOracle: latencyMap, + InjectedLatencyEnabled: c.latencyEnabled.Get, }, }, }, @@ -504,6 +491,19 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { return nil } +// SetSimulatedLatency enables or disable the simulated latency and then +// clears the remote clock tracking. If the remote clocks were not cleared, +// bad routing decisions would be made as soon as latency is turned on. +func (c *transientCluster) SetSimulatedLatency(on bool) { + c.latencyEnabled.Set(on) + for _, s := range c.servers { + s.RPCContext().RemoteClocks.TestingResetLatencyInfos() + } + for _, s := range c.tenantServers { + s.RPCContext().RemoteClocks.TestingResetLatencyInfos() + } +} + // createAndAddNode is responsible for determining node parameters, // instantiating the server component and connecting it to the // cluster's stopper. @@ -551,7 +551,8 @@ func (c *transientCluster) createAndAddNode( // started listening on RPC, and before they proceed with their // startup routine. serverKnobs.ContextTestingKnobs = rpc.ContextTestingKnobs{ - ArtificialLatencyMap: make(map[string]int), + InjectedLatencyOracle: regionlatency.MakeAddrMap(), + InjectedLatencyEnabled: c.latencyEnabled.Get, } } @@ -1418,6 +1419,14 @@ func (c *transientCluster) NumNodes() int { return len(c.servers) } +func (c *transientCluster) NumServers() int { + return len(c.servers) +} + +func (c *transientCluster) Server(i int) serverutils.TestServerInterface { + return c.servers[i] +} + func (c *transientCluster) GetLocality(nodeID int32) string { return c.demoCtx.Localities[nodeID-1].String() } diff --git a/pkg/cli/democluster/demo_cluster_test.go b/pkg/cli/democluster/demo_cluster_test.go index badbf768a1e9..e534e3f22617 100644 --- a/pkg/cli/democluster/demo_cluster_test.go +++ b/pkg/cli/democluster/demo_cluster_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/securityassets" "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -146,6 +147,7 @@ func TestTransientClusterSimulateLatencies(t *testing.T) { // Set up an empty 9-node cluster with simulated latencies. demoCtx.SimulateLatency = true demoCtx.NumNodes = 9 + demoCtx.Localities = defaultLocalities certsDir := t.TempDir() @@ -179,6 +181,8 @@ func TestTransientClusterSimulateLatencies(t *testing.T) { require.NoError(t, c.Start(ctx)) + c.SetSimulatedLatency(true) + for _, tc := range []struct { desc string nodeIdx int @@ -212,12 +216,14 @@ func TestTransientClusterSimulateLatencies(t *testing.T) { } }() // Find the maximum latency in the cluster from the current node. - var maxLatency time.Duration - for _, latencyMS := range regionToRegionToLatency[tc.region] { - if d := time.Duration(latencyMS) * time.Millisecond; d > maxLatency { - maxLatency = d + var maxLatency regionlatency.RoundTripLatency + localityLatencies.ForEachLatencyFrom(tc.region, func( + _ regionlatency.Region, l regionlatency.OneWayLatency, + ) { + if rtt := l * 2; rtt > maxLatency { + maxLatency = rtt } - } + }) // Attempt to make a query that talks to every node. // This should take at least maxLatency. diff --git a/pkg/cli/democluster/demo_locality_list.go b/pkg/cli/democluster/demo_locality_list.go index f7fe0d31274f..c8dcd404d13f 100644 --- a/pkg/cli/democluster/demo_locality_list.go +++ b/pkg/cli/democluster/demo_locality_list.go @@ -12,8 +12,10 @@ package democluster import ( "strings" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency" ) // DemoLocalityList represents a list of localities for the cockroach @@ -60,3 +62,10 @@ var defaultLocalities = DemoLocalityList{ {Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}, {Key: "az", Value: "c"}}}, {Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}, {Key: "az", Value: "d"}}}, } + +// Round-trip latencies collected from http://cloudping.co on 2019-09-11. +var localityLatencies = regionlatency.RoundTripPairs{ + {A: "us-east1", B: "us-west1"}: 66 * time.Millisecond, + {A: "us-east1", B: "europe-west1"}: 64 * time.Millisecond, + {A: "us-west1", B: "europe-west1"}: 146 * time.Millisecond, +}.ToLatencyMap() diff --git a/pkg/cli/democluster/region_latencies.go b/pkg/cli/democluster/region_latencies.go deleted file mode 100644 index ac7522f0d0db..000000000000 --- a/pkg/cli/democluster/region_latencies.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package democluster - -type regionPair struct { - regionA string - regionB string -} - -var regionToRegionToLatency map[string]map[string]int - -func insertPair(pair regionPair, latency int) { - regionToLatency, ok := regionToRegionToLatency[pair.regionA] - if !ok { - regionToLatency = make(map[string]int) - regionToRegionToLatency[pair.regionA] = regionToLatency - } - regionToLatency[pair.regionB] = latency -} - -// Round-trip latencies collected from http://cloudping.co on 2019-09-11. -var regionRoundTripLatencies = map[regionPair]int{ - {regionA: "us-east1", regionB: "us-west1"}: 66, - {regionA: "us-east1", regionB: "europe-west1"}: 64, - {regionA: "us-west1", regionB: "europe-west1"}: 146, -} - -var regionOneWayLatencies = make(map[regionPair]int) - -func init() { - // We record one-way latencies next, because the logic in our delayingConn - // and delayingListener is in terms of one-way network delays. - for pair, latency := range regionRoundTripLatencies { - regionOneWayLatencies[pair] = latency / 2 - } - regionToRegionToLatency = make(map[string]map[string]int) - for pair, latency := range regionOneWayLatencies { - insertPair(pair, latency) - insertPair(regionPair{ - regionA: pair.regionB, - regionB: pair.regionA, - }, latency) - } -} diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index de235f8cf9b0..1ac027efcad9 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -290,6 +290,7 @@ go_test( "replica_rangefeed_test.go", "replica_rankings_test.go", "replica_sideload_test.go", + "replica_split_load_test.go", "replica_sst_snapshot_storage_test.go", "replica_test.go", "replica_tscache_test.go", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 8f123c74d4d5..9fe32b4e05f5 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -401,9 +401,14 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var g *concurrency.Guard defer func() { // Handle load-based splitting, if necessary. - if pErr == nil { - spansRead, _, _ := r.collectSpansRead(ba, br) - r.recordBatchForLoadBasedSplitting(ctx, ba, spansRead) + if pErr == nil && br != nil { + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } else { + r.recordBatchForLoadBasedSplitting(ctx, ba, br) + } } // NB: wrapped to delay g evaluation to its value when returning. diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index ea42bbabbdde..3cc43ae28e39 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -13,7 +13,6 @@ package kvserver import ( "context" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -48,16 +47,89 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } +// getResponseBoundarySpan computes the union span of the true spans that were +// iterated over using the request span and the response's resumeSpan. +// +// Assumptions: +// 1. br != nil +// 2. len(ba.Requests) == len(br.Responses) +// Assumptions are checked in executeBatchWithConcurrencyRetries. +func getResponseBoundarySpan( + ba *roachpb.BatchRequest, br *roachpb.BatchResponse, +) (responseBoundarySpan roachpb.Span) { + addSpanToBoundary := func(span roachpb.Span) { + if !responseBoundarySpan.Valid() { + responseBoundarySpan = span + } else { + responseBoundarySpan = responseBoundarySpan.Combine(span) + } + } + for i, respUnion := range br.Responses { + reqHeader := ba.Requests[i].GetInner().Header() + resp := respUnion.GetInner() + resumeSpan := resp.Header().ResumeSpan + if resumeSpan == nil { + // Fully evaluated. + addSpanToBoundary(reqHeader.Span()) + continue + } + + switch resp.(type) { + case *roachpb.GetResponse: + // The request did not evaluate. Ignore it. + continue + case *roachpb.ScanResponse: + // Not reverse (->) + // Request: [key...............endKey) + // ResumeSpan: [key......endKey) + // True span: [key......key) + // + // Assumptions (not checked to minimize overhead): + // reqHeader.EndKey == resumeSpan.EndKey + // reqHeader.Key <= resumeSpan.Key. + if reqHeader.Key.Equal(resumeSpan.Key) { + // The request did not evaluate. Ignore it. + continue + } + addSpanToBoundary(roachpb.Span{ + Key: reqHeader.Key, + EndKey: resumeSpan.Key, + }) + case *roachpb.ReverseScanResponse: + // Reverse (<-) + // Request: [key...............endKey) + // ResumeSpan: [key......endKey) + // True span: [endKey...endKey) + // + // Assumptions (not checked to minimize overhead): + // reqHeader.Key == resumeSpan.Key + // resumeSpan.EndKey <= reqHeader.EndKey. + if reqHeader.EndKey.Equal(resumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + continue + } + addSpanToBoundary(roachpb.Span{ + Key: resumeSpan.EndKey, + EndKey: reqHeader.EndKey, + }) + default: + // Consider it fully evaluated, which is safe. + addSpanToBoundary(reqHeader.Span()) + } + } + return +} + // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, ) { if !r.SplitByLoadEnabled() { return } shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span { - return spans.BoundarySpan(spanset.SpanGlobal) + return getResponseBoundarySpan(ba, br) }) if shouldInitSplit { r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) diff --git a/pkg/kv/kvserver/replica_split_load_test.go b/pkg/kv/kvserver/replica_split_load_test.go new file mode 100644 index 000000000000..3d04da3546b9 --- /dev/null +++ b/pkg/kv/kvserver/replica_split_load_test.go @@ -0,0 +1,297 @@ +/// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func roachpbKey(key uint32) roachpb.Key { + return keys.SystemSQLCodec.TablePrefix(key) +} + +func requestHeaderWithNilEndKey(key uint32) roachpb.RequestHeader { + return roachpb.RequestHeader{ + Key: roachpbKey(key), + } +} + +func requestHeader(key uint32, endKey uint32) roachpb.RequestHeader { + return roachpb.RequestHeader{ + Key: roachpbKey(key), + EndKey: roachpbKey(endKey), + } +} + +func responseHeaderWithNilResumeSpan() roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: nil, + } +} + +func responseHeaderWithNilEndKey(key uint32) roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: roachpbKey(key), + }, + } +} + +func responseHeader(key uint32, endKey uint32) roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: roachpbKey(key), + EndKey: roachpbKey(endKey), + }, + } +} + +func requestUnionGet(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_Get{ + Get: &roachpb.GetRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionGet(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_Get{ + Get: &roachpb.GetResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionScan(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionScan(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionReverseScan(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionReverseScan(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionDeleteRange(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_DeleteRange{ + DeleteRange: &roachpb.DeleteRangeRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionDeleteRange(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_DeleteRange{ + DeleteRange: &roachpb.DeleteRangeResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func TestGetResponseBoundarySpan(t *testing.T) { + defer leaktest.AfterTest(t)() + testCases := []struct { + ba *roachpb.BatchRequest + br *roachpb.BatchResponse + expectedResponseBoundarySpan roachpb.Span + }{ + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionGet(requestHeaderWithNilEndKey(100)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionGet(responseHeaderWithNilResumeSpan()), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeaderWithNilResumeSpan()), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(113, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(113), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionReverseScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionReverseScan(responseHeader(100, 879)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(879), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionDeleteRange(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionDeleteRange(responseHeader(113, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionGet(requestHeaderWithNilEndKey(100)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionGet(responseHeaderWithNilEndKey(100)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(100, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionReverseScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionReverseScan(responseHeader(100, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(500, 600)), + requestUnionReverseScan(requestHeader(475, 625)), + requestUnionGet(requestHeaderWithNilEndKey(480)), + requestUnionReverseScan(requestHeader(500, 510)), + requestUnionScan(requestHeader(700, 800)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(550, 600)), + responseUnionReverseScan(responseHeader(475, 525)), + responseUnionGet(responseHeaderWithNilResumeSpan()), + responseUnionReverseScan(responseHeaderWithNilResumeSpan()), + responseUnionScan(responseHeader(700, 800)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(480), + EndKey: roachpbKey(625), + }, + }, + } + for i, test := range testCases { + responseBoundarySpan := getResponseBoundarySpan(test.ba, test.br) + assert.Equal(t, test.expectedResponseBoundarySpan, responseBoundarySpan, "Expected response boundary span %s, got %s in test %d", + test.expectedResponseBoundarySpan, responseBoundarySpan, i) + } +} diff --git a/pkg/rpc/clock_offset.go b/pkg/rpc/clock_offset.go index 48f290dd517c..178002366ab7 100644 --- a/pkg/rpc/clock_offset.go +++ b/pkg/rpc/clock_offset.go @@ -102,6 +102,17 @@ type RemoteClockMonitor struct { metrics RemoteClockMetrics } +// TestingResetLatencyInfos will clear all latency info from the clock monitor. +// It is intended to be used in tests when enabling or disabling injected +// latency. +func (r *RemoteClockMonitor) TestingResetLatencyInfos() { + r.mu.Lock() + defer r.mu.Unlock() + for a := range r.mu.latencyInfos { + delete(r.mu.latencyInfos, a) + } +} + // newRemoteClockMonitor returns a monitor with the given server clock. func newRemoteClockMonitor( clock hlc.WallClock, diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 41c247d914db..d5b24eca5125 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1446,18 +1446,19 @@ func (rpcCtx *Context) grpcDialOptions( streamInterceptors = append(append([]grpc.StreamClientInterceptor(nil), streamInterceptors...), testingStreamInterceptor) } } - if rpcCtx.Knobs.ArtificialLatencyMap != nil { + if rpcCtx.Knobs.InjectedLatencyOracle != nil { dialerFunc := func(ctx context.Context, target string) (net.Conn, error) { dialer := net.Dialer{ LocalAddr: sourceAddr, } return dialer.DialContext(ctx, "tcp", target) } - latency := rpcCtx.Knobs.ArtificialLatencyMap[target] + latency := rpcCtx.Knobs.InjectedLatencyOracle.GetLatency(target) log.VEventf(rpcCtx.MasterCtx, 1, "connecting to node %s with simulated latency %dms", target, latency) dialer := artificialLatencyDialer{ dialerFunc: dialerFunc, - latencyMS: latency, + latency: latency, + enabled: rpcCtx.Knobs.InjectedLatencyEnabled, } dialerFunc = dialer.dial dialOpts = append(dialOpts, grpc.WithContextDialer(dialerFunc)) @@ -1562,7 +1563,8 @@ type dialerFunc func(context.Context, string) (net.Conn, error) type artificialLatencyDialer struct { dialerFunc dialerFunc - latencyMS int + latency time.Duration + enabled func() bool } func (ald *artificialLatencyDialer) dial(ctx context.Context, addr string) (net.Conn, error) { @@ -1572,21 +1574,23 @@ func (ald *artificialLatencyDialer) dial(ctx context.Context, addr string) (net. } return &delayingConn{ Conn: conn, - latency: time.Duration(ald.latencyMS) * time.Millisecond, + latency: ald.latency, + enabled: ald.enabled, readBuf: new(bytes.Buffer), }, nil } type delayingListener struct { net.Listener + enabled func() bool } // NewDelayingListener creates a net.Listener that introduces a set delay on its connections. -func NewDelayingListener(l net.Listener) net.Listener { - return delayingListener{Listener: l} +func NewDelayingListener(l net.Listener, enabled func() bool) net.Listener { + return &delayingListener{Listener: l, enabled: enabled} } -func (d delayingListener) Accept() (net.Conn, error) { +func (d *delayingListener) Accept() (net.Conn, error) { c, err := d.Listener.Accept() if err != nil { return nil, err @@ -1597,6 +1601,7 @@ func (d delayingListener) Accept() (net.Conn, error) { // as packets are exchanged across the delayingConnections. latency: time.Duration(0) * time.Millisecond, readBuf: new(bytes.Buffer), + enabled: d.enabled, }, nil } @@ -1612,12 +1617,24 @@ func (d delayingListener) Accept() (net.Conn, error) { // on both ends with x/2 milliseconds of latency. type delayingConn struct { net.Conn + enabled func() bool latency time.Duration lastSendEnd time.Time readBuf *bytes.Buffer } -func (d delayingConn) Write(b []byte) (n int, err error) { +func (d *delayingConn) getLatencyMS() int32 { + if !d.isEnabled() { + return 0 + } + return int32(d.latency / time.Millisecond) +} + +func (d *delayingConn) isEnabled() bool { + return d.enabled == nil || d.enabled() +} + +func (d *delayingConn) Write(b []byte) (n int, err error) { tNow := timeutil.Now() if d.lastSendEnd.Before(tNow) { d.lastSendEnd = tNow @@ -1626,7 +1643,7 @@ func (d delayingConn) Write(b []byte) (n int, err error) { Magic: magic, ReadTime: d.lastSendEnd.Add(d.latency).UnixNano(), Sz: int32(len(b)), - DelayMS: int32(d.latency / time.Millisecond), + DelayMS: d.getLatencyMS(), } if err := binary.Write(d.Conn, binary.BigEndian, hdr); err != nil { return n, err @@ -1665,9 +1682,9 @@ func (d *delayingConn) Read(b []byte) (n int, err error) { if d.latency == 0 && hdr.DelayMS != 0 { d.latency = time.Duration(hdr.DelayMS) * time.Millisecond } - defer func() { - time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime))) - }() + if d.isEnabled() { + defer time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime))) + } if _, err := io.CopyN(d.readBuf, d.Conn, int64(hdr.Sz)); err != nil { return 0, err } @@ -1756,13 +1773,14 @@ func (rpcCtx *Context) grpcDialRaw( dialer := onlyOnceDialer{} dialerFunc := dialer.dial - if rpcCtx.Knobs.ArtificialLatencyMap != nil { - latency := rpcCtx.Knobs.ArtificialLatencyMap[target] + if rpcCtx.Knobs.InjectedLatencyOracle != nil { + latency := rpcCtx.Knobs.InjectedLatencyOracle.GetLatency(target) log.VEventf(ctx, 1, "connecting with simulated latency %dms", latency) dialer := artificialLatencyDialer{ dialerFunc: dialerFunc, - latencyMS: latency, + latency: latency, + enabled: rpcCtx.Knobs.InjectedLatencyEnabled, } dialerFunc = dialer.dial } diff --git a/pkg/rpc/context_testutils.go b/pkg/rpc/context_testutils.go index 223a9954ade5..e910b5f2babc 100644 --- a/pkg/rpc/context_testutils.go +++ b/pkg/rpc/context_testutils.go @@ -12,6 +12,7 @@ package rpc import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -35,17 +36,26 @@ type ContextTestingKnobs struct { // internalClientAdapter - i.e. KV RPCs done against the local server. StreamClientInterceptor func(target string, class ConnectionClass) grpc.StreamClientInterceptor - // ArtificialLatencyMap if non-nil contains a map from target address + // InjectedLatencyOracle if non-nil contains a map from target address // (server.RPCServingAddr() of a remote node) to artificial latency in // milliseconds to inject. Setting this will cause the server to pause for - // the given amount of milliseconds on every network write. - ArtificialLatencyMap map[string]int + // the given duration on every network write. + InjectedLatencyOracle InjectedLatencyOracle + + // InjectedLatencyEnabled is used to turn on or off the InjectedLatencyOracle. + InjectedLatencyEnabled func() bool // StorageClusterID initializes the Context's StorageClusterID container to // this value if non-nil at construction time. StorageClusterID *uuid.UUID } +// InjectedLatencyOracle is a testing mechanism used to inject artificial +// latency to an address. +type InjectedLatencyOracle interface { + GetLatency(addr string) time.Duration +} + // NewInsecureTestingContext creates an insecure rpc Context suitable for tests. func NewInsecureTestingContext( ctx context.Context, clock *hlc.Clock, stopper *stop.Stopper, diff --git a/pkg/server/start_listen.go b/pkg/server/start_listen.go index a6072668b86a..0c28b7c5901c 100644 --- a/pkg/server/start_listen.go +++ b/pkg/server/start_listen.go @@ -102,8 +102,8 @@ func startListenRPCAndSQL( anyL := m.Match(cmux.Any()) if serverTestKnobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok { - if serverTestKnobs.ContextTestingKnobs.ArtificialLatencyMap != nil { - anyL = rpc.NewDelayingListener(anyL) + if serverTestKnobs.ContextTestingKnobs.InjectedLatencyOracle != nil { + anyL = rpc.NewDelayingListener(anyL, serverTestKnobs.ContextTestingKnobs.InjectedLatencyEnabled) } } diff --git a/pkg/testutils/serverutils/regionlatency/BUILD.bazel b/pkg/testutils/serverutils/regionlatency/BUILD.bazel new file mode 100644 index 000000000000..b2deb36d3eb1 --- /dev/null +++ b/pkg/testutils/serverutils/regionlatency/BUILD.bazel @@ -0,0 +1,18 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "regionlatency", + srcs = ["region_latencies.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency", + visibility = ["//visibility:public"], + deps = [ + "//pkg/rpc", + "//pkg/server", + "//pkg/sql", + "//pkg/testutils/serverutils", + "@com_github_cockroachdb_errors//:errors", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/testutils/serverutils/regionlatency/region_latencies.go b/pkg/testutils/serverutils/regionlatency/region_latencies.go new file mode 100644 index 000000000000..ae4166a36188 --- /dev/null +++ b/pkg/testutils/serverutils/regionlatency/region_latencies.go @@ -0,0 +1,158 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package regionlatency has testing utilities for injecting artificial latency +// into test clusters. +package regionlatency + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/errors" +) + +// LatencyMap contains mapping for the speed-of-light delay between a pair +// of regions. Note that this is not a round-trip, but rather it's one-way. +type LatencyMap struct { + m map[Region]map[Region]OneWayLatency +} + +// Region is the name of a region. +type Region = string + +// Pair is a pair of regions. +type Pair struct{ A, B Region } + +func (p Pair) reversed() Pair { return Pair{A: p.B, B: p.A} } + +// TestCluster is implemented both by testcluster.TestCluster and +// democluster.transientCluster. +type TestCluster interface { + NumServers() int + Server(i int) serverutils.TestServerInterface +} + +// RoundTripLatency is the time to go from a region to another region and back. +type RoundTripLatency = time.Duration + +// OneWayLatency is the time to go from a region to another region. +type OneWayLatency = time.Duration + +// Apply is used to inject the latency pairs into the InjectedLatencyOracle. +// This step must be done after the servers have been created but before they +// have been allowed to issue RPCs. It is intended to be paired with the +// server testing knob PauseAfterGettingRPCAddresses. If there is no +// InjectedLatencyOracle testing knob or if it does not implement AddrMap, an +// error will be returned. +func (m LatencyMap) Apply(tc TestCluster) error { + for i, n := 0, tc.NumServers(); i < n; i++ { + serv := tc.Server(i) + serverKnobs, ok := serv.TestingKnobs().Server.(*server.TestingKnobs) + if !ok { + return errors.AssertionFailedf( + "failed to inject latencies: no server testing knobs for server %d", i, + ) + } + latencyMap, ok := serverKnobs.ContextTestingKnobs.InjectedLatencyOracle.(AddrMap) + if !ok { + return errors.AssertionFailedf( + "failed to inject latencies: InjectedLatencyOracle for server %d is %T", + i, serverKnobs.ContextTestingKnobs.InjectedLatencyOracle, + ) + } + cfg := serv.ExecutorConfig().(sql.ExecutorConfig) + srcLocality, ok := cfg.Locality.Find("region") + if !ok { + continue + } + for j := 0; j < tc.NumServers(); j++ { + dst := tc.Server(j) + if dst == serv { + continue + } + dstCfg := dst.ExecutorConfig().(sql.ExecutorConfig) + dstLocality, ok := dstCfg.Locality.Find("region") + if !ok { + continue + } + l, ok := m.getLatency(srcLocality, dstLocality) + if !ok { + continue + } + latencyMap.SetLatency(dst.ServingRPCAddr(), l) + } + } + return nil +} + +// ForEachLatencyFrom iterates the LatencyMap for each other region from the +// requested region. +func (m LatencyMap) ForEachLatencyFrom(a Region, f func(b Region, l OneWayLatency)) { + for r, l := range m.m[a] { + f(r, l) + } +} + +func (m LatencyMap) getLatency(a, b Region) (OneWayLatency, bool) { + fromA, ok := m.m[a] + if !ok { + return 0, false + } + toB, ok := fromA[b] + return toB, ok +} + +// RoundTripPairs are pairs of round-trip latency between regions. +type RoundTripPairs map[Pair]RoundTripLatency + +// ToLatencyMap converts the pairs to a mapping between each two region to +// the respective one-way latency between them. +func (t RoundTripPairs) ToLatencyMap() LatencyMap { + return makeRegionLatencyMap(t) +} + +func makeRegionLatencyMap(pairs RoundTripPairs) LatencyMap { + m := LatencyMap{ + m: make(map[string]map[string]time.Duration), + } + for pair, latency := range pairs { + insertPair(m, pair, latency/2) + insertPair(m, pair.reversed(), latency/2) + } + return m +} + +func insertPair(m LatencyMap, pair Pair, latency RoundTripLatency) { + regionToLatency, ok := m.m[pair.A] + if !ok { + regionToLatency = make(map[string]time.Duration) + m.m[pair.A] = regionToLatency + } + regionToLatency[pair.B] = latency +} + +// AddrMap implements rpc.InjectedLatencyOracle. +type AddrMap interface { + rpc.InjectedLatencyOracle + SetLatency(addr string, l time.Duration) +} + +// MakeAddrMap constructs a new AddrMap for use as the rpc.InjectedLatencyOracle. +func MakeAddrMap() AddrMap { return addrMap{} } + +// AddrMap implements rpc.InjectedLatencyOracle. +type addrMap map[string]time.Duration + +func (am addrMap) GetLatency(addr string) time.Duration { return am[addr] } +func (am addrMap) SetLatency(addr string, l time.Duration) { am[addr] = l }