diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 4e388a68083f..5e5a99f57ba6 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -44,7 +44,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -545,17 +544,17 @@ func TestOracle(t *testing.T) { leaseholder := replicas[2] rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - setLatency := func(addr string, latency time.Duration) { + setLatency := func(id roachpb.NodeID, latency time.Duration) { // All test cases have to have at least 11 measurement values in order for // the exponentially-weighted moving average to work properly. See the // comment on the WARMUP_SAMPLES const in the ewma package for details. for i := 0; i < 11; i++ { - rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency) + rpcContext.RemoteClocks.UpdateOffset(ctx, id, rpc.RemoteOffset{}, latency) } } - setLatency("1", 100*time.Millisecond) - setLatency("2", 2*time.Millisecond) - setLatency("3", 80*time.Millisecond) + setLatency(1, 100*time.Millisecond) + setLatency(2, 2*time.Millisecond) + setLatency(3, 80*time.Millisecond) testCases := []struct { name string @@ -700,7 +699,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2` recCh := make(chan tracingpb.Recording, 1) - var n2Addr, n3Addr syncutil.AtomicString tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, @@ -724,8 +722,8 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { // heartbeated by the time the test wants to use it. Without this // knob, that would cause the transport to reorder replicas. DontConsiderConnHealth: true, - LatencyFunc: func(addr string) (time.Duration, bool) { - if (addr == n2Addr.Get()) || (addr == n3Addr.Get()) { + LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) { + if (id == 2) || (id == 3) { return time.Millisecond, true } return 100 * time.Millisecond, true @@ -743,8 +741,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { }, }) defer tc.Stopper().Stop(ctx) - n2Addr.Set(tc.Servers[1].RPCAddr()) - n3Addr.Set(tc.Servers[2].RPCAddr()) n1 := sqlutils.MakeSQLRunner(tc.Conns[0]) n1.Exec(t, `CREATE DATABASE t`) @@ -887,11 +883,11 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { // For the variant where no latency information is available, we // expect n2 to serve follower reads as well, but because it // is in the same locality as the client. - LatencyFunc: func(addr string) (time.Duration, bool) { + LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) { if !validLatencyFunc { return 0, false } - if addr == tc.Server(1).RPCAddr() { + if id == 2 { return time.Millisecond, true } return 100 * time.Millisecond, true diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index a6e3a08f3f16..fc956688275e 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -22,18 +22,10 @@ import ( ) // ReplicaInfo extends the Replica structure with the associated node -// descriptor. +// Locality information. type ReplicaInfo struct { roachpb.ReplicaDescriptor - NodeDesc *roachpb.NodeDescriptor -} - -func (i ReplicaInfo) locality() []roachpb.Tier { - return i.NodeDesc.Locality.Tiers -} - -func (i ReplicaInfo) addr() string { - return i.NodeDesc.Address.String() + Tiers []roachpb.Tier } // A ReplicaSlice is a slice of ReplicaInfo. @@ -131,12 +123,12 @@ func NewReplicaSlice( } rs = append(rs, ReplicaInfo{ ReplicaDescriptor: r, - NodeDesc: nd, + Tiers: nd.Locality.Tiers, }) } if len(rs) == 0 { return nil, newSendError( - fmt.Sprintf("no replica node addresses available via gossip for r%d", desc.RangeID)) + fmt.Sprintf("no replica node information available via gossip for r%d", desc.RangeID)) } return rs, nil } @@ -188,8 +180,8 @@ func localityMatch(a, b []roachpb.Tier) int { } // A LatencyFunc returns the latency from this node to a remote -// address and a bool indicating whether the latency is valid. -type LatencyFunc func(string) (time.Duration, bool) +// node and a bool indicating whether the latency is valid. +type LatencyFunc func(roachpb.NodeID) (time.Duration, bool) // OptimizeReplicaOrder sorts the replicas in the order in which // they're to be used for sending RPCs (meaning in the order in which @@ -231,14 +223,14 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( } if latencyFn != nil { - latencyI, okI := latencyFn(rs[i].addr()) - latencyJ, okJ := latencyFn(rs[j].addr()) + latencyI, okI := latencyFn(rs[i].NodeID) + latencyJ, okJ := latencyFn(rs[j].NodeID) if okI && okJ { return latencyI < latencyJ } } - attrMatchI := localityMatch(locality.Tiers, rs[i].locality()) - attrMatchJ := localityMatch(locality.Tiers, rs[j].locality()) + attrMatchI := localityMatch(locality.Tiers, rs[i].Tiers) + attrMatchJ := localityMatch(locality.Tiers, rs[j].Tiers) // Longer locality matches sort first (the assumption is that // they'll have better latencies). return attrMatchI > attrMatchJ diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index ae0f4058a62f..6f4f47a77c4a 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -12,7 +12,6 @@ package kvcoord import ( "context" - "fmt" "reflect" "strings" "testing" @@ -167,18 +166,10 @@ func locality(t *testing.T, locStrs []string) roachpb.Locality { return locality } -func nodeDesc(t *testing.T, nid roachpb.NodeID, locStrs []string) *roachpb.NodeDescriptor { - return &roachpb.NodeDescriptor{ - NodeID: nid, - Locality: locality(t, locStrs), - Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("%d:26257", nid)), - } -} - func info(t *testing.T, nid roachpb.NodeID, sid roachpb.StoreID, locStrs []string) ReplicaInfo { return ReplicaInfo{ ReplicaDescriptor: desc(nid, sid), - NodeDesc: nodeDesc(t, nid, locStrs), + Tiers: locality(t, locStrs).Tiers, } } @@ -192,7 +183,7 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { // locality of the DistSender. locality roachpb.Locality // map from node address (see nodeDesc()) to latency to that node. - latencies map[string]time.Duration + latencies map[roachpb.NodeID]time.Duration slice ReplicaSlice // expOrder is the expected order in which the replicas sort. Replicas are // only identified by their node. If multiple replicas are on different @@ -217,10 +208,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { name: "order by latency", nodeID: 1, locality: locality(t, []string{"country=us", "region=west", "city=la"}), - latencies: map[string]time.Duration{ - "2:26257": time.Hour, - "3:26257": time.Minute, - "4:26257": time.Second, + latencies: map[roachpb.NodeID]time.Duration{ + 2: time.Hour, + 3: time.Minute, + 4: time.Second, }, slice: ReplicaSlice{ info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}), @@ -237,11 +228,11 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { name: "local node comes first", nodeID: 1, locality: locality(t, nil), - latencies: map[string]time.Duration{ - "1:26257": 10 * time.Hour, - "2:26257": time.Hour, - "3:26257": time.Minute, - "4:26257": time.Second, + latencies: map[roachpb.NodeID]time.Duration{ + 1: 10 * time.Hour, + 2: time.Hour, + 3: time.Minute, + 4: time.Second, }, slice: ReplicaSlice{ info(t, 1, 1, nil), @@ -257,8 +248,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { t.Run(test.name, func(t *testing.T) { var latencyFn LatencyFunc if test.latencies != nil { - latencyFn = func(addr string) (time.Duration, bool) { - lat, ok := test.latencies[addr] + latencyFn = func(id roachpb.NodeID) (time.Duration, bool) { + lat, ok := test.latencies[id] return lat, ok } } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index ed48cd9a4f12..40656f9051da 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -560,7 +560,7 @@ type AllocatorMetrics struct { type Allocator struct { st *cluster.Settings deterministic bool - nodeLatencyFn func(addr string) (time.Duration, bool) + nodeLatencyFn func(nodeID roachpb.NodeID) (time.Duration, bool) // TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source // wrapped inside a mutex, to avoid misuse. randGen allocatorRand @@ -599,7 +599,7 @@ func makeAllocatorMetrics() AllocatorMetrics { func MakeAllocator( st *cluster.Settings, deterministic bool, - nodeLatencyFn func(addr string) (time.Duration, bool), + nodeLatencyFn func(nodeID roachpb.NodeID) (time.Duration, bool), knobs *allocator.TestingKnobs, ) Allocator { var randSource rand.Source @@ -2492,12 +2492,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( if !ok { continue } - addr, err := storePool.GossipNodeIDAddress(repl.NodeID) - if err != nil { - log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err) - continue - } - remoteLatency, ok := a.nodeLatencyFn(addr.String()) + remoteLatency, ok := a.nodeLatencyFn(repl.NodeID) if !ok { continue } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 185aef65d313..8199f210f455 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -2112,7 +2112,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(ctx) @@ -2501,7 +2501,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2569,7 +2569,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -5420,11 +5420,11 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { now = now.Add(MinLeaseTransferStatsDuration) - noLatency := map[string]time.Duration{} - highLatency := map[string]time.Duration{ - stores[0].Node.Address.String(): 50 * time.Millisecond, - stores[1].Node.Address.String(): 50 * time.Millisecond, - stores[2].Node.Address.String(): 50 * time.Millisecond, + noLatency := map[roachpb.NodeID]time.Duration{} + highLatency := map[roachpb.NodeID]time.Duration{ + stores[0].Node.NodeID: 50 * time.Millisecond, + stores[1].Node.NodeID: 50 * time.Millisecond, + stores[2].Node.NodeID: 50 * time.Millisecond, } existing := []roachpb.ReplicaDescriptor{ @@ -5435,7 +5435,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { testCases := []struct { leaseholder roachpb.StoreID - latency map[string]time.Duration + latency map[roachpb.NodeID]time.Duration stats *replicastats.ReplicaStats excludeLeaseRepl bool expected roachpb.StoreID @@ -5501,8 +5501,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(st, true /* deterministic */, func(addr string) (time.Duration, bool) { - return c.latency[addr], true + a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { + return c.latency[id], true }, nil) localitySummary := c.stats.SnapshotRatedSummary(now) usage := allocator.RangeUsageInfo{} @@ -7486,7 +7486,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) { + a := MakeAllocator(st, false /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, nil) @@ -8237,7 +8237,7 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.NodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, false /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, false }, nil) @@ -8689,7 +8689,7 @@ func exampleRebalancing( storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) { return 0, false }, nil) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go index da694c987ada..9532689d045f 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -42,7 +43,7 @@ func CreateTestAllocatorWithKnobs( storepool.TestTimeUntilStoreDeadOff, deterministic, func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, deterministic, func(string) (time.Duration, bool) { + a := MakeAllocator(st, deterministic, func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, knobs) return stopper, g, storePool, a, manual diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 0868601ecac7..95ba551323e2 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//pkg/rpc", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/util", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index 4cd59b90a008..fcb0cdad0f38 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -172,13 +171,6 @@ func (o *OverrideStorePool) GetStoreDescriptor( return o.sp.GetStoreDescriptor(storeID) } -// GossipNodeIDAddress implements the AllocatorStorePool interface. -func (o *OverrideStorePool) GossipNodeIDAddress( - nodeID roachpb.NodeID, -) (*util.UnresolvedAddr, error) { - return o.sp.GossipNodeIDAddress(nodeID) -} - // UpdateLocalStoreAfterRebalance implements the AllocatorStorePool interface. // This override method is a no-op, as // StorePool.UpdateLocalStoreAfterRebalance(..) is not a read-only method and diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 9dbd4978884e..55beabc549ff 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -394,9 +393,6 @@ type AllocatorStorePool interface { filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) - // GossipNodeIDAddress looks up the RPC address for the given node via gossip. - GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) - // LiveAndDeadReplicas divides the provided repls slice into two slices: the // first for live replicas, and the second for dead replicas. // See comment on StorePool.LiveAndDeadReplicas(..). @@ -1242,11 +1238,6 @@ func (sp *StorePool) GetLocalitiesByNode( return localities } -// GossipNodeIDAddress looks up the RPC address for the given node via gossip. -func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) { - return sp.gossip.GetNodeIDAddress(nodeID) -} - // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index d306afc8965a..4651327b8714 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -781,7 +781,7 @@ func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { return allocatorimpl.MakeAllocator( s.stores[storeID].settings, s.stores[storeID].storepool.IsDeterministic(), - func(addr string) (time.Duration, bool) { return 0, true }, + func(id roachpb.NodeID) (time.Duration, bool) { return 0, true }, &allocator.TestingKnobs{ AllowLeaseTransfersToReplicasNeedingSnapshots: true, }, diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 19a8cc4262c5..0958e9adb6f3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1209,7 +1209,7 @@ func NewStore( s.allocator = allocatorimpl.MakeAllocator( cfg.Settings, storePoolIsDeterministic, - func(string) (time.Duration, bool) { + func(id roachpb.NodeID) (time.Duration, bool) { return 0, false }, cfg.TestingKnobs.AllocatorKnobs, ) diff --git a/pkg/rpc/clock_offset.go b/pkg/rpc/clock_offset.go index 178002366ab7..cfb74c2a6c50 100644 --- a/pkg/rpc/clock_offset.go +++ b/pkg/rpc/clock_offset.go @@ -16,6 +16,7 @@ import ( "time" "github.com/VividCortex/ewma" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -95,8 +96,8 @@ type RemoteClockMonitor struct { mu struct { syncutil.RWMutex - offsets map[string]RemoteOffset - latencyInfos map[string]*latencyInfo + offsets map[roachpb.NodeID]RemoteOffset + latencyInfos map[roachpb.NodeID]*latencyInfo } metrics RemoteClockMetrics @@ -125,8 +126,8 @@ func newRemoteClockMonitor( maxOffset: maxOffset, offsetTTL: offsetTTL, } - r.mu.offsets = make(map[string]RemoteOffset) - r.mu.latencyInfos = make(map[string]*latencyInfo) + r.mu.offsets = make(map[roachpb.NodeID]RemoteOffset) + r.mu.latencyInfos = make(map[roachpb.NodeID]*latencyInfo) if histogramWindowInterval == 0 { histogramWindowInterval = time.Duration(math.MaxInt64) } @@ -147,25 +148,25 @@ func (r *RemoteClockMonitor) Metrics() *RemoteClockMetrics { } // Latency returns the exponentially weighted moving average latency to the -// given node address. Returns true if the measurement is valid, or false if +// given node id. Returns true if the measurement is valid, or false if // we don't have enough samples to compute a reliable average. -func (r *RemoteClockMonitor) Latency(addr string) (time.Duration, bool) { +func (r *RemoteClockMonitor) Latency(id roachpb.NodeID) (time.Duration, bool) { r.mu.RLock() defer r.mu.RUnlock() - if info, ok := r.mu.latencyInfos[addr]; ok && info.avgNanos.Value() != 0.0 { + if info, ok := r.mu.latencyInfos[id]; ok && info.avgNanos.Value() != 0.0 { return time.Duration(int64(info.avgNanos.Value())), true } return 0, false } // AllLatencies returns a map of all currently valid latency measurements. -func (r *RemoteClockMonitor) AllLatencies() map[string]time.Duration { +func (r *RemoteClockMonitor) AllLatencies() map[roachpb.NodeID]time.Duration { r.mu.RLock() defer r.mu.RUnlock() - result := make(map[string]time.Duration) - for addr, info := range r.mu.latencyInfos { + result := make(map[roachpb.NodeID]time.Duration) + for id, info := range r.mu.latencyInfos { if info.avgNanos.Value() != 0.0 { - result[addr] = time.Duration(int64(info.avgNanos.Value())) + result[id] = time.Duration(int64(info.avgNanos.Value())) } } return result @@ -174,50 +175,54 @@ func (r *RemoteClockMonitor) AllLatencies() map[string]time.Duration { // UpdateOffset is a thread-safe way to update the remote clock and latency // measurements. // -// It only updates the offset for addr if one of the following cases holds: -// 1. There is no prior offset for that address. -// 2. The old offset for addr was measured long enough ago to be considered +// It only updates the offset for the node if one of the following cases holds: +// 1. There is no prior offset for that node. +// 2. The old offset for the node was measured long enough ago to be considered // stale. // 3. The new offset's error is smaller than the old offset's error. // // Pass a roundTripLatency of 0 or less to avoid recording the latency. func (r *RemoteClockMonitor) UpdateOffset( - ctx context.Context, addr string, offset RemoteOffset, roundTripLatency time.Duration, + ctx context.Context, id roachpb.NodeID, offset RemoteOffset, roundTripLatency time.Duration, ) { emptyOffset := offset == RemoteOffset{} + // At startup the remote node's id may not be set. Skip recording latency. + if id == 0 { + return + } r.mu.Lock() defer r.mu.Unlock() - if oldOffset, ok := r.mu.offsets[addr]; !ok { + if oldOffset, ok := r.mu.offsets[id]; !ok { // We don't have a measurement - if the incoming measurement is not empty, // set it. if !emptyOffset { - r.mu.offsets[addr] = offset + r.mu.offsets[id] = offset } } else if oldOffset.isStale(r.offsetTTL, r.clock.Now()) { // We have a measurement but it's old - if the incoming measurement is not empty, // set it, otherwise delete the old measurement. if !emptyOffset { - r.mu.offsets[addr] = offset + r.mu.offsets[id] = offset } else { - delete(r.mu.offsets, addr) + delete(r.mu.offsets, id) } } else if offset.Uncertainty < oldOffset.Uncertainty { // We have a measurement but its uncertainty is greater than that of the // incoming measurement - if the incoming measurement is not empty, set it. if !emptyOffset { - r.mu.offsets[addr] = offset + r.mu.offsets[id] = offset } } if roundTripLatency > 0 { - info, ok := r.mu.latencyInfos[addr] + info, ok := r.mu.latencyInfos[id] if !ok { info = &latencyInfo{ avgNanos: ewma.NewMovingAverage(avgLatencyMeasurementAge), } - r.mu.latencyInfos[addr] = info + r.mu.latencyInfos[id] = info } newLatencyf := float64(roundTripLatency.Nanoseconds()) @@ -237,7 +242,7 @@ func (r *RemoteClockMonitor) UpdateOffset( } if log.V(2) { - log.Dev.Infof(ctx, "update offset: %s %v", addr, r.mu.offsets[addr]) + log.Dev.Infof(ctx, "update offset: n%d %v", id, r.mu.offsets[id]) } } @@ -261,9 +266,9 @@ func (r *RemoteClockMonitor) VerifyClockOffset(ctx context.Context) error { r.mu.Lock() // Each measurement is recorded as its minimum and maximum value. offsets := make(stats.Float64Data, 0, 2*len(r.mu.offsets)) - for addr, offset := range r.mu.offsets { + for id, offset := range r.mu.offsets { if offset.isStale(r.offsetTTL, now) { - delete(r.mu.offsets, addr) + delete(r.mu.offsets, id) continue } offsets = append(offsets, float64(offset.Offset+offset.Uncertainty)) diff --git a/pkg/rpc/clock_offset_test.go b/pkg/rpc/clock_offset_test.go index a7d2022f69f0..2bc452029f7b 100644 --- a/pkg/rpc/clock_offset_test.go +++ b/pkg/rpc/clock_offset_test.go @@ -12,12 +12,11 @@ package rpc import ( "context" - "fmt" "math" - "strconv" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -35,7 +34,7 @@ func TestUpdateOffset(t *testing.T) { maxOffset := time.Nanosecond monitor := newRemoteClockMonitor(clock, maxOffset, time.Hour, 0) - const key = "addr" + const key = 2 const latency = 10 * time.Millisecond // Case 1: There is no prior offset for the address. @@ -47,7 +46,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset1, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset1 { t.Errorf("expected offset %v, instead %v", offset1, o) } @@ -62,7 +61,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset2, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset2 { t.Errorf("expected offset %v, instead %v", offset2, o) } @@ -77,7 +76,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset3, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset3 { t.Errorf("expected offset %v, instead %v", offset3, o) } @@ -87,7 +86,7 @@ func TestUpdateOffset(t *testing.T) { monitor.UpdateOffset(context.Background(), key, offset2, latency) monitor.mu.Lock() if o, ok := monitor.mu.offsets[key]; !ok { - t.Errorf("expected key %s to be set in %v, but it was not", key, monitor.mu.offsets) + t.Errorf("expected key %d to be set in %v, but it was not", key, monitor.mu.offsets) } else if o != offset3 { t.Errorf("expected offset %v, instead %v", offset3, o) } @@ -112,9 +111,9 @@ func TestVerifyClockOffset(t *testing.T) { // error when less than a majority of offsets are under the maximum tolerated offset. {[]RemoteOffset{{Offset: 20, Uncertainty: 10}, {Offset: 58, Uncertainty: 20}, {Offset: 85, Uncertainty: 25}, {Offset: 91, Uncertainty: 31}}, true}, } { - monitor.mu.offsets = make(map[string]RemoteOffset) + monitor.mu.offsets = make(map[roachpb.NodeID]RemoteOffset) for i, offset := range tc.offsets { - monitor.mu.offsets[strconv.Itoa(i)] = offset + monitor.mu.offsets[roachpb.NodeID(i)] = offset } if tc.expectedError { @@ -166,8 +165,8 @@ func TestClockOffsetMetrics(t *testing.T) { clock := timeutil.NewManualTime(timeutil.Unix(0, 123)) maxOffset := 20 * time.Nanosecond monitor := newRemoteClockMonitor(clock, maxOffset, time.Hour, 0) - monitor.mu.offsets = map[string]RemoteOffset{ - "0": { + monitor.mu.offsets = map[roachpb.NodeID]RemoteOffset{ + 0: { Offset: 13, Uncertainty: 7, MeasuredAt: 6, @@ -197,7 +196,7 @@ func TestLatencies(t *testing.T) { // All test cases have to have at least 11 measurement values in order for // the exponentially-weighted moving average to work properly. See the // comment on the WARMUP_SAMPLES const in the ewma package for details. - const emptyKey = "no measurements" + const emptyKey = 1 for i := 0; i < 11; i++ { monitor.UpdateOffset(context.Background(), emptyKey, RemoteOffset{}, 0) } @@ -219,8 +218,9 @@ func TestLatencies(t *testing.T) { {[]time.Duration{10, 10, 10, 10, 10, 99, 99, 99, 99, 99, 99}, 58}, {[]time.Duration{99, 99, 99, 99, 99, 10, 10, 10, 10, 10, 10}, 50}, } - for _, tc := range testCases { - key := fmt.Sprintf("%v", tc.measurements) + for i, tc := range testCases { + // Start counting from node 1 since a 0 node id is special cased. + key := roachpb.NodeID(i + 1) for _, measurement := range tc.measurements { monitor.UpdateOffset(context.Background(), key, RemoteOffset{}, measurement) } diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 12bc8157d14e..0ec7507a3f6b 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2238,15 +2238,11 @@ func (rpcCtx *Context) runHeartbeat( // Start heartbeat loop. - maxOffset := rpcCtx.MaxOffset - maxOffsetNanos := maxOffset.Nanoseconds() - // The request object. Note that we keep the same object from // heartbeat to heartbeat: we compute a new .Offset at the end of // the current heartbeat as input to the next one. request := &PingRequest{ - OriginAddr: rpcCtx.Config.Addr, - OriginMaxOffsetNanos: maxOffsetNanos, + DeprecatedOriginAddr: rpcCtx.Config.Addr, TargetNodeID: conn.remoteNodeID, ServerVersion: rpcCtx.Settings.Version.BinaryVersion(), } @@ -2357,7 +2353,7 @@ func (rpcCtx *Context) runHeartbeat( remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2) request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds() } - rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration) + rpcCtx.RemoteClocks.UpdateOffset(ctx, conn.remoteNodeID, request.Offset, pingDuration) } if cb := rpcCtx.HeartbeatCB; cb != nil { diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index a2df9ef90bfb..f54070d95796 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1331,8 +1331,8 @@ func TestOffsetMeasurement(t *testing.T) { clientCtx.RemoteClocks.mu.Lock() defer clientCtx.RemoteClocks.mu.Unlock() - if o, ok := clientCtx.RemoteClocks.mu.offsets[remoteAddr]; !ok { - return errors.Errorf("expected offset of %s to be initialized, but it was not", remoteAddr) + if o, ok := clientCtx.RemoteClocks.mu.offsets[serverNodeID]; !ok { + return errors.Errorf("expected offset of %d to be initialized, but it was not", serverNodeID) } else if o != expectedOffset { return errors.Errorf("expected:\n%v\nactual:\n%v", expectedOffset, o) } @@ -1348,7 +1348,7 @@ func TestOffsetMeasurement(t *testing.T) { clientCtx.RemoteClocks.mu.Lock() defer clientCtx.RemoteClocks.mu.Unlock() - if o, ok := clientCtx.RemoteClocks.mu.offsets[remoteAddr]; ok { + if o, ok := clientCtx.RemoteClocks.mu.offsets[serverNodeID]; ok { return errors.Errorf("expected offset to have been cleared, but found %s", o) } return nil @@ -1404,7 +1404,7 @@ func TestFailedOffsetMeasurement(t *testing.T) { clientCtx.RemoteClocks.mu.Lock() defer clientCtx.RemoteClocks.mu.Unlock() - if _, ok := clientCtx.RemoteClocks.mu.offsets[remoteAddr]; !ok { + if _, ok := clientCtx.RemoteClocks.mu.offsets[serverNodeID]; !ok { return errors.Errorf("expected offset of %s to be initialized, but it was not", remoteAddr) } return nil @@ -1414,7 +1414,7 @@ func TestFailedOffsetMeasurement(t *testing.T) { serverCtx.RemoteClocks.mu.Lock() defer serverCtx.RemoteClocks.mu.Unlock() - if o, ok := serverCtx.RemoteClocks.mu.offsets[remoteAddr]; ok { + if o, ok := serverCtx.RemoteClocks.mu.offsets[serverNodeID]; ok { return errors.Errorf("expected offset of %s to not be initialized, but it was: %v", remoteAddr, o) } return nil diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 6a1f4d00cc68..d7445634193c 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -165,7 +165,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR serverOffset := args.Offset // The server offset should be the opposite of the client offset. serverOffset.Offset = -serverOffset.Offset - hs.remoteClockMonitor.UpdateOffset(ctx, args.OriginAddr, serverOffset, 0 /* roundTripLatency */) + hs.remoteClockMonitor.UpdateOffset(ctx, args.OriginNodeID, serverOffset, 0 /* roundTripLatency */) return &PingResponse{ Pong: args.Ping, ServerTime: hs.clock.Now().UnixNano(), diff --git a/pkg/rpc/heartbeat.proto b/pkg/rpc/heartbeat.proto index 218f54188a88..be21a62564da 100644 --- a/pkg/rpc/heartbeat.proto +++ b/pkg/rpc/heartbeat.proto @@ -42,10 +42,8 @@ message PingRequest { // The last offset the client measured with the server. optional RemoteOffset offset = 2 [(gogoproto.nullable) = false]; // The address of the client. - optional string origin_addr = 3 [(gogoproto.nullable) = false]; - // The configured maximum clock offset (in nanoseconds) on the server. - // TODO(nvanbenschoten): remove this field in v23.1. It is no longer read. - optional int64 origin_max_offset_nanos = 4 [(gogoproto.nullable) = false]; + // TODO(baptist): Remove this field in v23.2. It is no longer read. + optional string deprecated_origin_addr = 3 [(gogoproto.nullable) = false]; // Cluster ID to prevent connections between nodes in different clusters. optional bytes origin_cluster_id = 5 [ (gogoproto.customname) = "ClusterID", @@ -62,6 +60,8 @@ message PingRequest { (gogoproto.nullable) = false, (gogoproto.customname) = "OriginNodeID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + + reserved 4; } // A PingResponse contains the echoed ping request string. diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 9dad600aa7e9..3fc01ac84e7e 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -291,12 +291,7 @@ func (n *Dialer) Latency(nodeID roachpb.NodeID) (time.Duration, error) { if n.rpcContext.RemoteClocks == nil { return 0, errors.AssertionFailedf("can't call Latency in a client command") } - addr, err := n.resolver(nodeID) - if err != nil { - // Don't trip the breaker. - return 0, err - } - latency, ok := n.rpcContext.RemoteClocks.Latency(addr.String()) + latency, ok := n.rpcContext.RemoteClocks.Latency(nodeID) if !ok { latency = 0 } diff --git a/pkg/server/server.go b/pkg/server/server.go index 21ef9eed23dd..f5d7e15f9276 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -728,7 +728,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { clock, nodeLiveness, rpcContext, - g, st, systemTenantNameContainer, ) diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel index eda39b39c5f9..1ba9954cb2a9 100644 --- a/pkg/server/status/BUILD.bazel +++ b/pkg/server/status/BUILD.bazel @@ -42,7 +42,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/build", - "//pkg/gossip", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/liveness", diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index 1777b4fb3ab9..920303cfcf82 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -24,7 +24,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -101,7 +100,6 @@ var ChildMetricsEnabled = settings.RegisterBoolSetting( // recorded, and they are thus kept separate. type MetricsRecorder struct { *HealthChecker - gossip *gossip.Gossip nodeLiveness *liveness.NodeLiveness rpcContext *rpc.Context settings *cluster.Settings @@ -157,7 +155,6 @@ func NewMetricsRecorder( clock *hlc.Clock, nodeLiveness *liveness.NodeLiveness, rpcContext *rpc.Context, - gossip *gossip.Gossip, settings *cluster.Settings, tenantNameContainer *roachpb.TenantNameContainer, ) *MetricsRecorder { @@ -165,7 +162,6 @@ func NewMetricsRecorder( HealthChecker: NewHealthChecker(trackedMetrics), nodeLiveness: nodeLiveness, rpcContext: rpcContext, - gossip: gossip, settings: settings, } mr.mu.storeRegistries = make(map[roachpb.StoreID]*metric.Registry) @@ -380,25 +376,17 @@ func (mr *MetricsRecorder) getNetworkActivity( ctx context.Context, ) map[roachpb.NodeID]statuspb.NodeStatus_NetworkActivity { activity := make(map[roachpb.NodeID]statuspb.NodeStatus_NetworkActivity) - if mr.nodeLiveness != nil && mr.gossip != nil { + if mr.nodeLiveness != nil { isLiveMap := mr.nodeLiveness.GetIsLiveMap() - var currentAverages map[string]time.Duration + var currentAverages map[roachpb.NodeID]time.Duration if mr.rpcContext.RemoteClocks != nil { currentAverages = mr.rpcContext.RemoteClocks.AllLatencies() } for nodeID, entry := range isLiveMap { - address, err := mr.gossip.GetNodeIDAddress(nodeID) - if err != nil { - if entry.IsLive { - log.Warningf(ctx, "%v", err) - } - continue - } na := statuspb.NodeStatus_NetworkActivity{} - key := address.String() if entry.IsLive { - if latency, ok := currentAverages[key]; ok { + if latency, ok := currentAverages[nodeID]; ok { na.Latency = latency.Nanoseconds() } } diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index 5823237f234c..82016bd8902c 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -118,7 +118,6 @@ func TestMetricsRecorderTenants(t *testing.T) { hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, - nil, st, roachpb.NewTenantNameContainer(catconstants.SystemTenantName), ) @@ -142,7 +141,6 @@ func TestMetricsRecorderTenants(t *testing.T) { hlc.NewClock(manual, time.Nanosecond), nil, rpcCtxTenant, - nil, stTenant, appNameContainer, ) @@ -244,7 +242,7 @@ func TestMetricsRecorder(t *testing.T) { }, } - recorder := NewMetricsRecorder(hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, nil, st, roachpb.NewTenantNameContainer("")) + recorder := NewMetricsRecorder(hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, st, roachpb.NewTenantNameContainer("")) recorder.AddStore(store1) recorder.AddStore(store2) recorder.AddNode(reg1, nodeDesc, 50, "foo:26257", "foo:26258", "foo:5432") diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index f8b49ad04631..fda678ac53ab 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -940,7 +940,7 @@ func makeTenantSQLServerArgs( registry.AddMetricStruct(pp.Metrics()) protectedTSProvider = pp - recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st, tenantNameContainer) + recorder := status.NewMetricsRecorder(clock, nil, rpcContext, st, tenantNameContainer) // Note: If the tenant is in-process, we attach this tenant's metric // recorder to the parentRecorder held by the system tenant. This // ensures that generated Prometheus metrics from the system tenant diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index 1cce2c93e815..7f44251550e3 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "math/rand" - "strings" "testing" "time" @@ -50,8 +49,8 @@ func TestClosest(t *testing.T) { NodeID: 1, Locality: nd2.Locality, // pretend node 2 is closest. }) - o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { - if strings.HasSuffix(s, "2") { + o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) { + if id == 2 { return time.Nanosecond, validLatencyFunc } return time.Millisecond, validLatencyFunc