From 5cf0823e2ab4646ed98ebf2709ea79f3541935e5 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Mon, 23 Jan 2023 17:12:30 -0500 Subject: [PATCH] gossip: Track latency by nodeID rather than addr Previously the latency to remote nodes was tracked by address rather than the node's id. This could result in a few problems. First, the remote address could be reused across nodes. This could result in incorrect information. Additionally, places that used this information (such as the allocator) needed to unnecessarily map the node id to address just to do a lookup. Finally in preparation for dialback on heartbeat #84289 the use of the OriginAddr field in the PingRequest is deprecated. That PR will add back another field that can be used to lookup the Locality correct field to use. Epic: none Release note: None --- .../kvfollowerreadsccl/followerreads_test.go | 22 +++----- pkg/kv/kvclient/kvcoord/replica_slice.go | 28 ++++------ pkg/kv/kvclient/kvcoord/replica_slice_test.go | 35 +++++------- .../allocator/allocatorimpl/allocator.go | 11 +--- .../allocator/allocatorimpl/allocator_test.go | 28 +++++----- .../allocator/allocatorimpl/test_helpers.go | 3 +- .../kvserver/allocator/storepool/BUILD.bazel | 1 - .../storepool/override_store_pool.go | 8 --- .../allocator/storepool/store_pool.go | 9 --- pkg/kv/kvserver/asim/state/impl.go | 2 +- pkg/kv/kvserver/store.go | 2 +- pkg/rpc/clock_offset.go | 55 ++++++++++--------- pkg/rpc/clock_offset_test.go | 28 +++++----- pkg/rpc/context.go | 8 +-- pkg/rpc/context_test.go | 10 ++-- pkg/rpc/heartbeat.go | 2 +- pkg/rpc/heartbeat.proto | 8 +-- pkg/rpc/nodedialer/nodedialer.go | 7 +-- pkg/server/server.go | 1 - pkg/server/status/BUILD.bazel | 1 - pkg/server/status/recorder.go | 18 +----- pkg/server/status/recorder_test.go | 4 +- pkg/server/tenant.go | 2 +- .../physicalplan/replicaoracle/oracle_test.go | 5 +- 24 files changed, 117 insertions(+), 181 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 4e388a68083f..5e5a99f57ba6 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -44,7 +44,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -545,17 +544,17 @@ func TestOracle(t *testing.T) { leaseholder := replicas[2] rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - setLatency := func(addr string, latency time.Duration) { + setLatency := func(id roachpb.NodeID, latency time.Duration) { // All test cases have to have at least 11 measurement values in order for // the exponentially-weighted moving average to work properly. See the // comment on the WARMUP_SAMPLES const in the ewma package for details. for i := 0; i < 11; i++ { - rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency) + rpcContext.RemoteClocks.UpdateOffset(ctx, id, rpc.RemoteOffset{}, latency) } } - setLatency("1", 100*time.Millisecond) - setLatency("2", 2*time.Millisecond) - setLatency("3", 80*time.Millisecond) + setLatency(1, 100*time.Millisecond) + setLatency(2, 2*time.Millisecond) + setLatency(3, 80*time.Millisecond) testCases := []struct { name string @@ -700,7 +699,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2` recCh := make(chan tracingpb.Recording, 1) - var n2Addr, n3Addr syncutil.AtomicString tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, @@ -724,8 +722,8 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { // heartbeated by the time the test wants to use it. Without this // knob, that would cause the transport to reorder replicas. DontConsiderConnHealth: true, - LatencyFunc: func(addr string) (time.Duration, bool) { - if (addr == n2Addr.Get()) || (addr == n3Addr.Get()) { + LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) { + if (id == 2) || (id == 3) { return time.Millisecond, true } return 100 * time.Millisecond, true @@ -743,8 +741,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { }, }) defer tc.Stopper().Stop(ctx) - n2Addr.Set(tc.Servers[1].RPCAddr()) - n3Addr.Set(tc.Servers[2].RPCAddr()) n1 := sqlutils.MakeSQLRunner(tc.Conns[0]) n1.Exec(t, `CREATE DATABASE t`) @@ -887,11 +883,11 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { // For the variant where no latency information is available, we // expect n2 to serve follower reads as well, but because it // is in the same locality as the client. - LatencyFunc: func(addr string) (time.Duration, bool) { + LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) { if !validLatencyFunc { return 0, false } - if addr == tc.Server(1).RPCAddr() { + if id == 2 { return time.Millisecond, true } return 100 * time.Millisecond, true diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index a6e3a08f3f16..fc956688275e 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -22,18 +22,10 @@ import ( ) // ReplicaInfo extends the Replica structure with the associated node -// descriptor. +// Locality information. type ReplicaInfo struct { roachpb.ReplicaDescriptor - NodeDesc *roachpb.NodeDescriptor -} - -func (i ReplicaInfo) locality() []roachpb.Tier { - return i.NodeDesc.Locality.Tiers -} - -func (i ReplicaInfo) addr() string { - return i.NodeDesc.Address.String() + Tiers []roachpb.Tier } // A ReplicaSlice is a slice of ReplicaInfo. @@ -131,12 +123,12 @@ func NewReplicaSlice( } rs = append(rs, ReplicaInfo{ ReplicaDescriptor: r, - NodeDesc: nd, + Tiers: nd.Locality.Tiers, }) } if len(rs) == 0 { return nil, newSendError( - fmt.Sprintf("no replica node addresses available via gossip for r%d", desc.RangeID)) + fmt.Sprintf("no replica node information available via gossip for r%d", desc.RangeID)) } return rs, nil } @@ -188,8 +180,8 @@ func localityMatch(a, b []roachpb.Tier) int { } // A LatencyFunc returns the latency from this node to a remote -// address and a bool indicating whether the latency is valid. -type LatencyFunc func(string) (time.Duration, bool) +// node and a bool indicating whether the latency is valid. +type LatencyFunc func(roachpb.NodeID) (time.Duration, bool) // OptimizeReplicaOrder sorts the replicas in the order in which // they're to be used for sending RPCs (meaning in the order in which @@ -231,14 +223,14 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( } if latencyFn != nil { - latencyI, okI := latencyFn(rs[i].addr()) - latencyJ, okJ := latencyFn(rs[j].addr()) + latencyI, okI := latencyFn(rs[i].NodeID) + latencyJ, okJ := latencyFn(rs[j].NodeID) if okI && okJ { return latencyI < latencyJ } } - attrMatchI := localityMatch(locality.Tiers, rs[i].locality()) - attrMatchJ := localityMatch(locality.Tiers, rs[j].locality()) + attrMatchI := localityMatch(locality.Tiers, rs[i].Tiers) + attrMatchJ := localityMatch(locality.Tiers, rs[j].Tiers) // Longer locality matches sort first (the assumption is that // they'll have better latencies). return attrMatchI > attrMatchJ diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index ae0f4058a62f..6f4f47a77c4a 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -12,7 +12,6 @@ package kvcoord import ( "context" - "fmt" "reflect" "strings" "testing" @@ -167,18 +166,10 @@ func locality(t *testing.T, locStrs []string) roachpb.Locality { return locality } -func nodeDesc(t *testing.T, nid roachpb.NodeID, locStrs []string) *roachpb.NodeDescriptor { - return &roachpb.NodeDescriptor{ - NodeID: nid, - Locality: locality(t, locStrs), - Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("%d:26257", nid)), - } -} - func info(t *testing.T, nid roachpb.NodeID, sid roachpb.StoreID, locStrs []string) ReplicaInfo { return ReplicaInfo{ ReplicaDescriptor: desc(nid, sid), - NodeDesc: nodeDesc(t, nid, locStrs), + Tiers: locality(t, locStrs).Tiers, } } @@ -192,7 +183,7 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { // locality of the DistSender. locality roachpb.Locality // map from node address (see nodeDesc()) to latency to that node. - latencies map[string]time.Duration + latencies map[roachpb.NodeID]time.Duration slice ReplicaSlice // expOrder is the expected order in which the replicas sort. Replicas are // only identified by their node. If multiple replicas are on different @@ -217,10 +208,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { name: "order by latency", nodeID: 1, locality: locality(t, []string{"country=us", "region=west", "city=la"}), - latencies: map[string]time.Duration{ - "2:26257": time.Hour, - "3:26257": time.Minute, - "4:26257": time.Second, + latencies: map[roachpb.NodeID]time.Duration{ + 2: time.Hour, + 3: time.Minute, + 4: time.Second, }, slice: ReplicaSlice{ info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}), @@ -237,11 +228,11 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { name: "local node comes first", nodeID: 1, locality: locality(t, nil), - latencies: map[string]time.Duration{ - "1:26257": 10 * time.Hour, - "2:26257": time.Hour, - "3:26257": time.Minute, - "4:26257": time.Second, + latencies: map[roachpb.NodeID]time.Duration{ + 1: 10 * time.Hour, + 2: time.Hour, + 3: time.Minute, + 4: time.Second, }, slice: ReplicaSlice{ info(t, 1, 1, nil), @@ -257,8 +248,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { t.Run(test.name, func(t *testing.T) { var latencyFn LatencyFunc if test.latencies != nil { - latencyFn = func(addr string) (time.Duration, bool) { - lat, ok := test.latencies[addr] + latencyFn = func(id roachpb.NodeID) (time.Duration, bool) { + lat, ok := test.latencies[id] return lat, ok } } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index ed48cd9a4f12..40656f9051da 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -560,7 +560,7 @@ type AllocatorMetrics struct { type Allocator struct { st *cluster.Settings deterministic bool - nodeLatencyFn func(addr string) (time.Duration, bool) + nodeLatencyFn func(nodeID roachpb.NodeID) (time.Duration, bool) // TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source // wrapped inside a mutex, to avoid misuse. randGen allocatorRand @@ -599,7 +599,7 @@ func makeAllocatorMetrics() AllocatorMetrics { func MakeAllocator( st *cluster.Settings, deterministic bool, - nodeLatencyFn func(addr string) (time.Duration, bool), + nodeLatencyFn func(nodeID roachpb.NodeID) (time.Duration, bool), knobs *allocator.TestingKnobs, ) Allocator { var randSource rand.Source @@ -2492,12 +2492,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( if !ok { continue } - addr, err := storePool.GossipNodeIDAddress(repl.NodeID) - if err != nil { - log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err) - continue - } - remoteLatency, ok := a.nodeLatencyFn(addr.String()) + remoteLatency, ok := a.nodeLatencyFn(repl.NodeID) if !ok { continue } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 185aef65d313..8199f210f455 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -2112,7 +2112,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(ctx) @@ -2501,7 +2501,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2569,7 +2569,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -5420,11 +5420,11 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { now = now.Add(MinLeaseTransferStatsDuration) - noLatency := map[string]time.Duration{} - highLatency := map[string]time.Duration{ - stores[0].Node.Address.String(): 50 * time.Millisecond, - stores[1].Node.Address.String(): 50 * time.Millisecond, - stores[2].Node.Address.String(): 50 * time.Millisecond, + noLatency := map[roachpb.NodeID]time.Duration{} + highLatency := map[roachpb.NodeID]time.Duration{ + stores[0].Node.NodeID: 50 * time.Millisecond, + stores[1].Node.NodeID: 50 * time.Millisecond, + stores[2].Node.NodeID: 50 * time.Millisecond, } existing := []roachpb.ReplicaDescriptor{ @@ -5435,7 +5435,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { testCases := []struct { leaseholder roachpb.StoreID - latency map[string]time.Duration + latency map[roachpb.NodeID]time.Duration stats *replicastats.ReplicaStats excludeLeaseRepl bool expected roachpb.StoreID @@ -5501,8 +5501,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(st, true /* deterministic */, func(addr string) (time.Duration, bool) { - return c.latency[addr], true + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { + return c.latency[id], true }, nil) localitySummary := c.stats.SnapshotRatedSummary(now) usage := allocator.RangeUsageInfo{} @@ -7486,7 +7486,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, false /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) @@ -8237,7 +8237,7 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.NodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, false /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, false }, nil) @@ -8689,7 +8689,7 @@ func exampleRebalancing( storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, false }, nil) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go index da694c987ada..9532689d045f 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -42,7 +43,7 @@ func CreateTestAllocatorWithKnobs( storepool.TestTimeUntilStoreDeadOff, deterministic, func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, deterministic, func(string) (time.Duration, bool) { + a := MakeAllocator(st, deterministic, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, knobs) return stopper, g, storePool, a, manual diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 0868601ecac7..95ba551323e2 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//pkg/rpc", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/util", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index 4cd59b90a008..fcb0cdad0f38 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -172,13 +171,6 @@ func (o *OverrideStorePool) GetStoreDescriptor( return o.sp.GetStoreDescriptor(storeID) } -// GossipNodeIDAddress implements the AllocatorStorePool interface. -func (o *OverrideStorePool) GossipNodeIDAddress( - nodeID roachpb.NodeID, -) (*util.UnresolvedAddr, error) { - return o.sp.GossipNodeIDAddress(nodeID) -} - // UpdateLocalStoreAfterRebalance implements the AllocatorStorePool interface. // This override method is a no-op, as // StorePool.UpdateLocalStoreAfterRebalance(..) is not a read-only method and diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 9dbd4978884e..55beabc549ff 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -394,9 +393,6 @@ type AllocatorStorePool interface { filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) - // GossipNodeIDAddress looks up the RPC address for the given node via gossip. - GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) - // LiveAndDeadReplicas divides the provided repls slice into two slices: the // first for live replicas, and the second for dead replicas. // See comment on StorePool.LiveAndDeadReplicas(..). @@ -1242,11 +1238,6 @@ func (sp *StorePool) GetLocalitiesByNode( return localities } -// GossipNodeIDAddress looks up the RPC address for the given node via gossip. -func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) { - return sp.gossip.GetNodeIDAddress(nodeID) -} - // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index d306afc8965a..4651327b8714 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -781,7 +781,7 @@ func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { return allocatorimpl.MakeAllocator( s.stores[storeID].settings, s.stores[storeID].storepool.IsDeterministic(), - func(addr string) (time.Duration, bool) { return 0, true }, + func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, &allocator.TestingKnobs{ AllowLeaseTransfersToReplicasNeedingSnapshots: true, }, diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 19a8cc4262c5..0958e9adb6f3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1209,7 +1209,7 @@ func NewStore( s.allocator = allocatorimpl.MakeAllocator( cfg.Settings, storePoolIsDeterministic, - func(string) (time.Duration, bool) { + func(id roachpb.NodeID) (time.Duration, bool) { return 0, false }, cfg.TestingKnobs.AllocatorKnobs, ) diff --git a/pkg/rpc/clock_offset.go b/pkg/rpc/clock_offset.go index 178002366ab7..9393cea483fe 100644 --- a/pkg/rpc/clock_offset.go +++ b/pkg/rpc/clock_offset.go @@ -16,6 +16,7 @@ import ( "time" "github.com/VividCortex/ewma" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -95,8 +96,8 @@ type RemoteClockMonitor struct { mu struct { syncutil.RWMutex - offsets map[string]RemoteOffset - latencyInfos map[string]*latencyInfo + offsets map[roachpb.NodeID]RemoteOffset + latencyInfos map[roachpb.NodeID]*latencyInfo } metrics RemoteClockMetrics @@ -125,8 +126,8 @@ func newRemoteClockMonitor( maxOffset: maxOffset, offsetTTL: offsetTTL, } - r.mu.offsets = make(map[string]RemoteOffset) - r.mu.latencyInfos = make(map[string]*latencyInfo) + r.mu.offsets = make(map[roachpb.NodeID]RemoteOffset) + r.mu.latencyInfos = make(map[roachpb.NodeID]*latencyInfo) if histogramWindowInterval == 0 { histogramWindowInterval = time.Duration(math.MaxInt64) } @@ -147,25 +148,25 @@ func (r *RemoteClockMonitor) Metrics() *RemoteClockMetrics { } // Latency returns the exponentially weighted moving average latency to the -// given node address. Returns true if the measurement is valid, or false if +// given node id. Returns true if the measurement is valid, or false if // we don't have enough samples to compute a reliable average. -func (r *RemoteClockMonitor) Latency(addr string) (time.Duration, bool) { +func (r *RemoteClockMonitor) Latency(id roachpb.NodeID) (time.Duration, bool) { r.mu.RLock() defer r.mu.RUnlock() - if info, ok := r.mu.latencyInfos[addr]; ok && info.avgNanos.Value() != 0.0 { + if info, ok := r.mu.latencyInfos[id]; ok && info.avgNanos.Value() != 0.0 { return time.Duration(int64(info.avgNanos.Value())), true } return 0, false } // AllLatencies returns a map of all currently valid latency measurements. -func (r *RemoteClockMonitor) AllLatencies() map[string]time.Duration { +func (r *RemoteClockMonitor) AllLatencies() map[roachpb.NodeID]time.Duration { r.mu.RLock() defer r.mu.RUnlock() - result := make(map[string]time.Duration) - for addr, info := range r.mu.latencyInfos { + result := make(map[roachpb.NodeID]time.Duration) + for id, info := range r.mu.latencyInfos { if info.avgNanos.Value() != 0.0 { - result[addr] = time.Duration(int64(info.avgNanos.Value())) + result[id] = time.Duration(int64(info.avgNanos.Value())) } } return result @@ -174,50 +175,54 @@ func (r *RemoteClockMonitor) AllLatencies() map[string]time.Duration { // UpdateOffset is a thread-safe way to update the remote clock and latency // measurements. // -// It only updates the offset for addr if one of the following cases holds: -// 1. There is no prior offset for that address. -// 2. The old offset for addr was measured long enough ago to be considered +// It only updates the offset for node if one of the following cases holds: +// 1. There is no prior offset for that node. +// 2. The old offset for node was measured long enough ago to be considered // stale. // 3. The new offset's error is smaller than the old offset's error. // // Pass a roundTripLatency of 0 or less to avoid recording the latency. func (r *RemoteClockMonitor) UpdateOffset( - ctx context.Context, addr string, offset RemoteOffset, roundTripLatency time.Duration, + ctx context.Context, id roachpb.NodeID, offset RemoteOffset, roundTripLatency time.Duration, ) { emptyOffset := offset == RemoteOffset{} + // At startup the remote node's id may not be set. Skip recording latency. + if id == 0 { + return + } r.mu.Lock() defer r.mu.Unlock() - if oldOffset, ok := r.mu.offsets[addr]; !ok { + if oldOffset, ok := r.mu.offsets[id]; !ok { // We don't have a measurement - if the incoming measurement is not empty, // set it. if !emptyOffset { - r.mu.offsets[addr] = offset + r.mu.offsets[id] = offset } } else if oldOffset.isStale(r.offsetTTL, r.clock.Now()) { // We have a measurement but it's old - if the incoming measurement is not empty, // set it, otherwise delete the old measurement. if !emptyOffset { - r.mu.offsets[addr] = offset + r.mu.offsets[id] = offset } else { - delete(r.mu.offsets, addr) + delete(r.mu.offsets, id) } } else if offset.Uncertainty < oldOffset.Uncertainty { // We have a measurement but its uncertainty is greater than that of the // incoming measurement - if the incoming measurement is not empty, set it. if !emptyOffset { - r.mu.offsets[addr] = offset + r.mu.offsets[id] = offset } } if roundTripLatency > 0 { - info, ok := r.mu.latencyInfos[addr] + info, ok := r.mu.latencyInfos[id] if !ok { info = &latencyInfo{ avgNanos: ewma.NewMovingAverage(avgLatencyMeasurementAge), } - r.mu.latencyInfos[addr] = info + r.mu.latencyInfos[id] = info } newLatencyf := float64(roundTripLatency.Nanoseconds()) @@ -237,7 +242,7 @@ func (r *RemoteClockMonitor) UpdateOffset( } if log.V(2) { - log.Dev.Infof(ctx, "update offset: %s %v", addr, r.mu.offsets[addr]) + log.Dev.Infof(ctx, "update offset: n%d %v", id, r.mu.offsets[id]) } } @@ -261,9 +266,9 @@ func (r *RemoteClockMonitor) VerifyClockOffset(ctx context.Context) error { r.mu.Lock() // Each measurement is recorded as its minimum and maximum value. offsets := make(stats.Float64Data, 0, 2*len(r.mu.offsets)) - for addr, offset := range r.mu.offsets { + for id, offset := range r.mu.offsets { if offset.isStale(r.offsetTTL, now) { - delete(r.mu.offsets, addr) + delete(r.mu.offsets, id) continue } offsets = append(offsets, float64(offset.Offset+offset.Uncertainty)) diff --git a/pkg/rpc/clock_offset_test.go b/pkg/rpc/clock_offset_test.go index a7d2022f69f0..2bc452029f7b 100644 --- a/pkg/rpc/clock_offset_test.go +++ b/pkg/rpc/clock_offset_test.go @@ -12,12 +12,11 @@ package rpc import ( "context" - "fmt" "math" - "strconv" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -35,7 +34,7 @@ func TestUpdateOffset(t *testing.T) { maxOffset := time.Nanosecond monitor := newRemoteClockMonitor(clock, maxOffset, time.Hour, 0) - const key = "addr" + const key = 2 const latency = 10 * time.Millisecond // Case 1: There is no prior offset for the address. @@ -47,7 +46,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset1, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset1 { t.Errorf("expected offset %v, instead %v", offset1, o) } @@ -62,7 +61,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset2, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset2 { t.Errorf("expected offset %v, instead %v", offset2, o) } @@ -77,7 +76,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset3, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset3 { t.Errorf("expected offset %v, instead %v", offset3, o) } @@ -87,7 +86,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset2, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset3 { t.Errorf("expected offset %v, instead %v", offset3, o) } @@ -112,9 +111,9 @@ func TestVerifyClockOffset(t *testing.T) { // error when less than a majority of offsets are under the maximum tolerated offset. {[]RemoteOffset{{Offset: 20, Uncertainty: 10}, {Offset: 58, Uncertainty: 20}, {Offset: 85, Uncertainty: 25}, {Offset: 91, Uncertainty: 31}}, true}, } { - monitor.mu.offsets = make(map[string]RemoteOffset) + monitor.mu.offsets = make(map[roachpb.NodeID]RemoteOffset) for i, offset := range tc.offsets { - monitor.mu.offsets[strconv.Itoa(i)] = offset + monitor.mu.offsets[roachpb.NodeID(i)] = offset } if tc.expectedError { @@ -166,8 +165,8 @@ func TestClockOffsetMetrics(t *testing.T) { clock := timeutil.NewManualTime(timeutil.Unix(0, 123)) maxOffset := 20 * time.Nanosecond monitor := newRemoteClockMonitor(clock, maxOffset, time.Hour, 0) - monitor.mu.offsets = map[string]RemoteOffset{ - "0": { + monitor.mu.offsets = map[roachpb.NodeID]RemoteOffset{ + 0: { Offset: 13, Uncertainty: 7, MeasuredAt: 6, @@ -197,7 +196,7 @@ func TestLatencies(t *testing.T) { // All test cases have to have at least 11 measurement values in order for // the exponentially-weighted moving average to work properly. See the // comment on the WARMUP_SAMPLES const in the ewma package for details. - const emptyKey = "no measurements" + const emptyKey = 1 for i := 0; i < 11; i++ { monitor.UpdateOffset(context.Background(), emptyKey, RemoteOffset{}, 0) } @@ -219,8 +218,9 @@ func TestLatencies(t *testing.T) { {[]time.Duration{10, 10, 10, 10, 10, 99, 99, 99, 99, 99, 99}, 58}, {[]time.Duration{99, 99, 99, 99, 99, 10, 10, 10, 10, 10, 10}, 50}, } - for _, tc := range testCases { - key := fmt.Sprintf("%v", tc.measurements) + for i, tc := range testCases { + // Start counting from node 1 since a 0 node id is special cased. + key := roachpb.NodeID(i + 1) for _, measurement := range tc.measurements { monitor.UpdateOffset(context.Background(), key, RemoteOffset{}, measurement) } diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 12bc8157d14e..0ec7507a3f6b 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2238,15 +2238,11 @@ func (rpcCtx *Context) runHeartbeat( // Start heartbeat loop. - maxOffset := rpcCtx.MaxOffset - maxOffsetNanos := maxOffset.Nanoseconds() - // The request object. Note that we keep the same object from // heartbeat to heartbeat: we compute a new .Offset at the end of // the current heartbeat as input to the next one. request := &PingRequest{ - OriginAddr: rpcCtx.Config.Addr, - OriginMaxOffsetNanos: maxOffsetNanos, + DeprecatedOriginAddr: rpcCtx.Config.Addr, TargetNodeID: conn.remoteNodeID, ServerVersion: rpcCtx.Settings.Version.BinaryVersion(), } @@ -2357,7 +2353,7 @@ func (rpcCtx *Context) runHeartbeat( remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2) request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds() } - rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration) + rpcCtx.RemoteClocks.UpdateOffset(ctx, conn.remoteNodeID, request.Offset, pingDuration) } if cb := rpcCtx.HeartbeatCB; cb != nil { diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index a2df9ef90bfb..f54070d95796 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1331,8 +1331,8 @@ func TestOffsetMeasurement(t *testing.T) { clientCtx.RemoteClocks.mu.Lock() defer clientCtx.RemoteClocks.mu.Unlock() - if o, ok := clientCtx.RemoteClocks.mu.offsets[remoteAddr]; !ok { - return errors.Errorf("expected offset of %s to be initialized, but it was not", remoteAddr) + if o, ok := clientCtx.RemoteClocks.mu.offsets[serverNodeID]; !ok { + return errors.Errorf("expected offset of %d to be initialized, but it was not", serverNodeID) } else if o != expectedOffset { return errors.Errorf("expected:\n%v\nactual:\n%v", expectedOffset, o) } @@ -1348,7 +1348,7 @@ func TestOffsetMeasurement(t *testing.T) { clientCtx.RemoteClocks.mu.Lock() defer clientCtx.RemoteClocks.mu.Unlock() - if o, ok := clientCtx.RemoteClocks.mu.offsets[remoteAddr]; ok { + if o, ok := clientCtx.RemoteClocks.mu.offsets[serverNodeID]; ok { return errors.Errorf("expected offset to have been cleared, but found %s", o) } return nil @@ -1404,7 +1404,7 @@ func TestFailedOffsetMeasurement(t *testing.T) { clientCtx.RemoteClocks.mu.Lock() defer clientCtx.RemoteClocks.mu.Unlock() - if _, ok := clientCtx.RemoteClocks.mu.offsets[remoteAddr]; !ok { + if _, ok := clientCtx.RemoteClocks.mu.offsets[serverNodeID]; !ok { return errors.Errorf("expected offset of %s to be initialized, but it was not", remoteAddr) } return nil @@ -1414,7 +1414,7 @@ func TestFailedOffsetMeasurement(t *testing.T) { serverCtx.RemoteClocks.mu.Lock() defer serverCtx.RemoteClocks.mu.Unlock() - if o, ok := serverCtx.RemoteClocks.mu.offsets[remoteAddr]; ok { + if o, ok := serverCtx.RemoteClocks.mu.offsets[serverNodeID]; ok { return errors.Errorf("expected offset of %s to not be initialized, but it was: %v", remoteAddr, o) } return nil diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 6a1f4d00cc68..d7445634193c 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -165,7 +165,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR serverOffset := args.Offset // The server offset should be the opposite of the client offset. serverOffset.Offset = -serverOffset.Offset - hs.remoteClockMonitor.UpdateOffset(ctx, args.OriginAddr, serverOffset, 0 /* roundTripLatency */) + hs.remoteClockMonitor.UpdateOffset(ctx, args.OriginNodeID, serverOffset, 0 /* roundTripLatency */) return &PingResponse{ Pong: args.Ping, ServerTime: hs.clock.Now().UnixNano(), diff --git a/pkg/rpc/heartbeat.proto b/pkg/rpc/heartbeat.proto index 218f54188a88..be21a62564da 100644 --- a/pkg/rpc/heartbeat.proto +++ b/pkg/rpc/heartbeat.proto @@ -42,10 +42,8 @@ message PingRequest { // The last offset the client measured with the server. optional RemoteOffset offset = 2 [(gogoproto.nullable) = false]; // The address of the client. - optional string origin_addr = 3 [(gogoproto.nullable) = false]; - // The configured maximum clock offset (in nanoseconds) on the server. - // TODO(nvanbenschoten): remove this field in v23.1. It is no longer read. - optional int64 origin_max_offset_nanos = 4 [(gogoproto.nullable) = false]; + // TODO(baptist): Remove this field in v23.2. It is no longer read. + optional string deprecated_origin_addr = 3 [(gogoproto.nullable) = false]; // Cluster ID to prevent connections between nodes in different clusters. optional bytes origin_cluster_id = 5 [ (gogoproto.customname) = "ClusterID", @@ -62,6 +60,8 @@ message PingRequest { (gogoproto.nullable) = false, (gogoproto.customname) = "OriginNodeID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + + reserved 4; } // A PingResponse contains the echoed ping request string. diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 9dad600aa7e9..3fc01ac84e7e 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -291,12 +291,7 @@ func (n *Dialer) Latency(nodeID roachpb.NodeID) (time.Duration, error) { if n.rpcContext.RemoteClocks == nil { return 0, errors.AssertionFailedf("can't call Latency in a client command") } - addr, err := n.resolver(nodeID) - if err != nil { - // Don't trip the breaker. - return 0, err - } - latency, ok := n.rpcContext.RemoteClocks.Latency(addr.String()) + latency, ok := n.rpcContext.RemoteClocks.Latency(nodeID) if !ok { latency = 0 } diff --git a/pkg/server/server.go b/pkg/server/server.go index 21ef9eed23dd..f5d7e15f9276 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -728,7 +728,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { clock, nodeLiveness, rpcContext, - g, st, systemTenantNameContainer, ) diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel index eda39b39c5f9..1ba9954cb2a9 100644 --- a/pkg/server/status/BUILD.bazel +++ b/pkg/server/status/BUILD.bazel @@ -42,7 +42,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/build", - "//pkg/gossip", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/liveness", diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index 1777b4fb3ab9..920303cfcf82 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -24,7 +24,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -101,7 +100,6 @@ var ChildMetricsEnabled = settings.RegisterBoolSetting( // recorded, and they are thus kept separate. type MetricsRecorder struct { *HealthChecker - gossip *gossip.Gossip nodeLiveness *liveness.NodeLiveness rpcContext *rpc.Context settings *cluster.Settings @@ -157,7 +155,6 @@ func NewMetricsRecorder( clock *hlc.Clock, nodeLiveness *liveness.NodeLiveness, rpcContext *rpc.Context, - gossip *gossip.Gossip, settings *cluster.Settings, tenantNameContainer *roachpb.TenantNameContainer, ) *MetricsRecorder { @@ -165,7 +162,6 @@ func NewMetricsRecorder( HealthChecker: NewHealthChecker(trackedMetrics), nodeLiveness: nodeLiveness, rpcContext: rpcContext, - gossip: gossip, settings: settings, } mr.mu.storeRegistries = make(map[roachpb.StoreID]*metric.Registry) @@ -380,25 +376,17 @@ func (mr *MetricsRecorder) getNetworkActivity( ctx context.Context, ) map[roachpb.NodeID]statuspb.NodeStatus_NetworkActivity { activity := make(map[roachpb.NodeID]statuspb.NodeStatus_NetworkActivity) - if mr.nodeLiveness != nil && mr.gossip != nil { + if mr.nodeLiveness != nil { isLiveMap := mr.nodeLiveness.GetIsLiveMap() - var currentAverages map[string]time.Duration + var currentAverages map[roachpb.NodeID]time.Duration if mr.rpcContext.RemoteClocks != nil { currentAverages = mr.rpcContext.RemoteClocks.AllLatencies() } for nodeID, entry := range isLiveMap { - address, err := mr.gossip.GetNodeIDAddress(nodeID) - if err != nil { - if entry.IsLive { - log.Warningf(ctx, "%v", err) - } - continue - } na := statuspb.NodeStatus_NetworkActivity{} - key := address.String() if entry.IsLive { - if latency, ok := currentAverages[key]; ok { + if latency, ok := currentAverages[nodeID]; ok { na.Latency = latency.Nanoseconds() } } diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index 5823237f234c..82016bd8902c 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -118,7 +118,6 @@ func TestMetricsRecorderTenants(t *testing.T) { hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, - nil, st, roachpb.NewTenantNameContainer(catconstants.SystemTenantName), ) @@ -142,7 +141,6 @@ func TestMetricsRecorderTenants(t *testing.T) { hlc.NewClock(manual, time.Nanosecond), nil, rpcCtxTenant, - nil, stTenant, appNameContainer, ) @@ -244,7 +242,7 @@ func TestMetricsRecorder(t *testing.T) { }, } - recorder := NewMetricsRecorder(hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, nil, st, roachpb.NewTenantNameContainer("")) + recorder := NewMetricsRecorder(hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, st, roachpb.NewTenantNameContainer("")) recorder.AddStore(store1) recorder.AddStore(store2) recorder.AddNode(reg1, nodeDesc, 50, "foo:26257", "foo:26258", "foo:5432") diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index f8b49ad04631..fda678ac53ab 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -940,7 +940,7 @@ func makeTenantSQLServerArgs( registry.AddMetricStruct(pp.Metrics()) protectedTSProvider = pp - recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st, tenantNameContainer) + recorder := status.NewMetricsRecorder(clock, nil, rpcContext, st, tenantNameContainer) // Note: If the tenant is in-process, we attach this tenant's metric // recorder to the parentRecorder held by the system tenant. This // ensures that generated Prometheus metrics from the system tenant diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index 1cce2c93e815..7f44251550e3 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "math/rand" - "strings" "testing" "time" @@ -50,8 +49,8 @@ func TestClosest(t *testing.T) { NodeID: 1, Locality: nd2.Locality, // pretend node 2 is closest. }) - o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { - if strings.HasSuffix(s, "2") { + o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) { + if id == 2 { return time.Nanosecond, validLatencyFunc } return time.Millisecond, validLatencyFunc