Skip to content

Commit

Permalink
gossip: Track latency by nodeID rather than addr
Browse files Browse the repository at this point in the history
Previously the latency to remote nodes was tracked by address rather
than the node's id. This could result in a few problems. First, the
remote address could be reused across nodes. This could result in
incorrect information. Additionally, places that used this information
(such as the allocator) needed to unnecessarily map the node id to
address just to do a lookup.

Finally in preparation for dialback on heartbeat #84289 the use of the
OriginAddr field in the PingRequest is deprecated. That PR will add back
another field that can be used to lookup the Locality correct field to
use.

Epic: none
Release note: None
  • Loading branch information
andrewbaptist committed Jan 25, 2023
1 parent b21379b commit 5cf0823
Show file tree
Hide file tree
Showing 24 changed files with 117 additions and 181 deletions.
22 changes: 9 additions & 13 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -545,17 +544,17 @@ func TestOracle(t *testing.T) {
leaseholder := replicas[2]

rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
setLatency := func(addr string, latency time.Duration) {
setLatency := func(id roachpb.NodeID, latency time.Duration) {
// All test cases have to have at least 11 measurement values in order for
// the exponentially-weighted moving average to work properly. See the
// comment on the WARMUP_SAMPLES const in the ewma package for details.
for i := 0; i < 11; i++ {
rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency)
rpcContext.RemoteClocks.UpdateOffset(ctx, id, rpc.RemoteOffset{}, latency)
}
}
setLatency("1", 100*time.Millisecond)
setLatency("2", 2*time.Millisecond)
setLatency("3", 80*time.Millisecond)
setLatency(1, 100*time.Millisecond)
setLatency(2, 2*time.Millisecond)
setLatency(3, 80*time.Millisecond)

testCases := []struct {
name string
Expand Down Expand Up @@ -700,7 +699,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var n2Addr, n3Addr syncutil.AtomicString
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
Expand All @@ -724,8 +722,8 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
// heartbeated by the time the test wants to use it. Without this
// knob, that would cause the transport to reorder replicas.
DontConsiderConnHealth: true,
LatencyFunc: func(addr string) (time.Duration, bool) {
if (addr == n2Addr.Get()) || (addr == n3Addr.Get()) {
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if (id == 2) || (id == 3) {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
Expand All @@ -743,8 +741,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
},
})
defer tc.Stopper().Stop(ctx)
n2Addr.Set(tc.Servers[1].RPCAddr())
n3Addr.Set(tc.Servers[2].RPCAddr())

n1 := sqlutils.MakeSQLRunner(tc.Conns[0])
n1.Exec(t, `CREATE DATABASE t`)
Expand Down Expand Up @@ -887,11 +883,11 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
// For the variant where no latency information is available, we
// expect n2 to serve follower reads as well, but because it
// is in the same locality as the client.
LatencyFunc: func(addr string) (time.Duration, bool) {
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if !validLatencyFunc {
return 0, false
}
if addr == tc.Server(1).RPCAddr() {
if id == 2 {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
Expand Down
28 changes: 10 additions & 18 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,10 @@ import (
)

// ReplicaInfo extends the Replica structure with the associated node
// descriptor.
// Locality information.
type ReplicaInfo struct {
roachpb.ReplicaDescriptor
NodeDesc *roachpb.NodeDescriptor
}

func (i ReplicaInfo) locality() []roachpb.Tier {
return i.NodeDesc.Locality.Tiers
}

func (i ReplicaInfo) addr() string {
return i.NodeDesc.Address.String()
Tiers []roachpb.Tier
}

// A ReplicaSlice is a slice of ReplicaInfo.
Expand Down Expand Up @@ -131,12 +123,12 @@ func NewReplicaSlice(
}
rs = append(rs, ReplicaInfo{
ReplicaDescriptor: r,
NodeDesc: nd,
Tiers: nd.Locality.Tiers,
})
}
if len(rs) == 0 {
return nil, newSendError(
fmt.Sprintf("no replica node addresses available via gossip for r%d", desc.RangeID))
fmt.Sprintf("no replica node information available via gossip for r%d", desc.RangeID))
}
return rs, nil
}
Expand Down Expand Up @@ -188,8 +180,8 @@ func localityMatch(a, b []roachpb.Tier) int {
}

// A LatencyFunc returns the latency from this node to a remote
// address and a bool indicating whether the latency is valid.
type LatencyFunc func(string) (time.Duration, bool)
// node and a bool indicating whether the latency is valid.
type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)

// OptimizeReplicaOrder sorts the replicas in the order in which
// they're to be used for sending RPCs (meaning in the order in which
Expand Down Expand Up @@ -231,14 +223,14 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
}

if latencyFn != nil {
latencyI, okI := latencyFn(rs[i].addr())
latencyJ, okJ := latencyFn(rs[j].addr())
latencyI, okI := latencyFn(rs[i].NodeID)
latencyJ, okJ := latencyFn(rs[j].NodeID)
if okI && okJ {
return latencyI < latencyJ
}
}
attrMatchI := localityMatch(locality.Tiers, rs[i].locality())
attrMatchJ := localityMatch(locality.Tiers, rs[j].locality())
attrMatchI := localityMatch(locality.Tiers, rs[i].Tiers)
attrMatchJ := localityMatch(locality.Tiers, rs[j].Tiers)
// Longer locality matches sort first (the assumption is that
// they'll have better latencies).
return attrMatchI > attrMatchJ
Expand Down
35 changes: 13 additions & 22 deletions pkg/kv/kvclient/kvcoord/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package kvcoord

import (
"context"
"fmt"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -167,18 +166,10 @@ func locality(t *testing.T, locStrs []string) roachpb.Locality {
return locality
}

func nodeDesc(t *testing.T, nid roachpb.NodeID, locStrs []string) *roachpb.NodeDescriptor {
return &roachpb.NodeDescriptor{
NodeID: nid,
Locality: locality(t, locStrs),
Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("%d:26257", nid)),
}
}

func info(t *testing.T, nid roachpb.NodeID, sid roachpb.StoreID, locStrs []string) ReplicaInfo {
return ReplicaInfo{
ReplicaDescriptor: desc(nid, sid),
NodeDesc: nodeDesc(t, nid, locStrs),
Tiers: locality(t, locStrs).Tiers,
}
}

Expand All @@ -192,7 +183,7 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
// locality of the DistSender.
locality roachpb.Locality
// map from node address (see nodeDesc()) to latency to that node.
latencies map[string]time.Duration
latencies map[roachpb.NodeID]time.Duration
slice ReplicaSlice
// expOrder is the expected order in which the replicas sort. Replicas are
// only identified by their node. If multiple replicas are on different
Expand All @@ -217,10 +208,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
name: "order by latency",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[string]time.Duration{
"2:26257": time.Hour,
"3:26257": time.Minute,
"4:26257": time.Second,
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}),
Expand All @@ -237,11 +228,11 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
name: "local node comes first",
nodeID: 1,
locality: locality(t, nil),
latencies: map[string]time.Duration{
"1:26257": 10 * time.Hour,
"2:26257": time.Hour,
"3:26257": time.Minute,
"4:26257": time.Second,
latencies: map[roachpb.NodeID]time.Duration{
1: 10 * time.Hour,
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 1, 1, nil),
Expand All @@ -257,8 +248,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var latencyFn LatencyFunc
if test.latencies != nil {
latencyFn = func(addr string) (time.Duration, bool) {
lat, ok := test.latencies[addr]
latencyFn = func(id roachpb.NodeID) (time.Duration, bool) {
lat, ok := test.latencies[id]
return lat, ok
}
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ type AllocatorMetrics struct {
type Allocator struct {
st *cluster.Settings
deterministic bool
nodeLatencyFn func(addr string) (time.Duration, bool)
nodeLatencyFn func(nodeID roachpb.NodeID) (time.Duration, bool)
// TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source
// wrapped inside a mutex, to avoid misuse.
randGen allocatorRand
Expand Down Expand Up @@ -599,7 +599,7 @@ func makeAllocatorMetrics() AllocatorMetrics {
func MakeAllocator(
st *cluster.Settings,
deterministic bool,
nodeLatencyFn func(addr string) (time.Duration, bool),
nodeLatencyFn func(nodeID roachpb.NodeID) (time.Duration, bool),
knobs *allocator.TestingKnobs,
) Allocator {
var randSource rand.Source
Expand Down Expand Up @@ -2492,12 +2492,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
if !ok {
continue
}
addr, err := storePool.GossipNodeIDAddress(repl.NodeID)
if err != nil {
log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err)
continue
}
remoteLatency, ok := a.nodeLatencyFn(addr.String())
remoteLatency, ok := a.nodeLatencyFn(repl.NodeID)
if !ok {
continue
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) {
a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(ctx)
Expand Down Expand Up @@ -2501,7 +2501,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) {
a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -2569,7 +2569,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) {
a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -5420,11 +5420,11 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {

now = now.Add(MinLeaseTransferStatsDuration)

noLatency := map[string]time.Duration{}
highLatency := map[string]time.Duration{
stores[0].Node.Address.String(): 50 * time.Millisecond,
stores[1].Node.Address.String(): 50 * time.Millisecond,
stores[2].Node.Address.String(): 50 * time.Millisecond,
noLatency := map[roachpb.NodeID]time.Duration{}
highLatency := map[roachpb.NodeID]time.Duration{
stores[0].Node.NodeID: 50 * time.Millisecond,
stores[1].Node.NodeID: 50 * time.Millisecond,
stores[2].Node.NodeID: 50 * time.Millisecond,
}

existing := []roachpb.ReplicaDescriptor{
Expand All @@ -5435,7 +5435,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {

testCases := []struct {
leaseholder roachpb.StoreID
latency map[string]time.Duration
latency map[roachpb.NodeID]time.Duration
stats *replicastats.ReplicaStats
excludeLeaseRepl bool
expected roachpb.StoreID
Expand Down Expand Up @@ -5501,8 +5501,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {

for _, c := range testCases {
t.Run("", func(t *testing.T) {
a := MakeAllocator(st, true /* deterministic */, func(addr string) (time.Duration, bool) {
return c.latency[addr], true
a := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return c.latency[id], true
}, nil)
localitySummary := c.stats.SnapshotRatedSummary(now)
usage := allocator.RangeUsageInfo{}
Expand Down Expand Up @@ -7486,7 +7486,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) {
storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) {
a := MakeAllocator(st, false /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, true
}, nil)

Expand Down Expand Up @@ -8237,7 +8237,7 @@ func TestAllocatorFullDisks(t *testing.T) {
mockNodeLiveness.NodeLivenessFunc,
false, /* deterministic */
)
alloc := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, false /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down Expand Up @@ -8689,7 +8689,7 @@ func exampleRebalancing(
storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc,
/* deterministic */ true,
)
alloc := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, true /* deterministic */, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -42,7 +43,7 @@ func CreateTestAllocatorWithKnobs(
storepool.TestTimeUntilStoreDeadOff, deterministic,
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(st, deterministic, func(string) (time.Duration, bool) {
a := MakeAllocator(st, deterministic, func(id roachpb.NodeID) (time.Duration, bool) {
return 0, true
}, knobs)
return stopper, g, storePool, a, manual
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/allocator/storepool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ go_library(
"//pkg/rpc",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand Down
8 changes: 0 additions & 8 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand Down Expand Up @@ -172,13 +171,6 @@ func (o *OverrideStorePool) GetStoreDescriptor(
return o.sp.GetStoreDescriptor(storeID)
}

// GossipNodeIDAddress implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GossipNodeIDAddress(
nodeID roachpb.NodeID,
) (*util.UnresolvedAddr, error) {
return o.sp.GossipNodeIDAddress(nodeID)
}

// UpdateLocalStoreAfterRebalance implements the AllocatorStorePool interface.
// This override method is a no-op, as
// StorePool.UpdateLocalStoreAfterRebalance(..) is not a read-only method and
Expand Down
Loading

0 comments on commit 5cf0823

Please sign in to comment.