Skip to content

Commit

Permalink
kvclient: make a test more robust
Browse files Browse the repository at this point in the history
TestFollowerReadsWithStaleDescriptor was fragile: it wants to control
precisely which follower a particular query will be routed to and it
went through some knobs to do so, but one more knob is needed. This
patch introduces the knob for inhibiting the GRPCTransport's check of
connection health when deciding the replica ordering - it normally tries
to deprioritize nodes to which there isn't a healthy conn. A connection
starts off as unhealthy until it's heartbeated for the first time so,
depending on whether a connection was established or not to the desired
node sufficiently before the query of interest, that check is a problem.

In particular, the transport's default behavior becomes a problem with
the following commits which remove the old closed timestamp mechanism
that came with early establishment of some network connections. Without
it, this test becomes unhappy.

Release note: None
  • Loading branch information
andreimatei committed Aug 19, 2021
1 parent 65457f9 commit f092ad9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,13 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
UseDatabase: "t",
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
// Inhibit the checking of connection health done by the
// GRPCTransport. This test wants to control what replica (which
// follower) a request is sent to and, depending on timing, the
// connection from n4 to the respective follower might not be
// heartbeated by the time the test wants to use it. Without this
// knob, that would cause the transport to reorder replicas.
DontConsiderConnHealth: true,
LatencyFunc: func(addr string) (time.Duration, bool) {
if (addr == n2Addr.Get()) || (addr == n3Addr.Get()) {
return time.Millisecond, true
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ type DistSender struct {
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
dontReorderReplicas bool
// dontConsiderConnectionHealth, if set, makes the GRPCTransport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
dontConsiderConnHealth bool
}

var _ kv.Sender = &DistSender{}
Expand Down Expand Up @@ -379,6 +384,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.transportFactory = GRPCTransportFactory
}
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.dontConsiderConnHealth = cfg.TestingKnobs.DontConsiderConnHealth
ds.rpcRetryOptions = base.DefaultRetryOptions()
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
Expand Down Expand Up @@ -1851,8 +1857,9 @@ func (ds *DistSender) sendToReplicas(
}

opts := SendOptions{
class: rpc.ConnectionClassForKey(desc.RSpan().Key),
metrics: &ds.metrics,
class: rpc.ConnectionClassForKey(desc.RSpan().Key),
metrics: &ds.metrics,
dontConsiderConnectionHealth: ds.dontConsiderConnHealth,
}
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type ClientTestingKnobs struct {
// testing purposes.
TransportFactory TransportFactory

// DontConsiderConnectionHealth, if set, makes the GRPCTransport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
DontConsiderConnHealth bool

// The maximum number of times a txn will attempt to refresh its
// spans for a single transactional batch.
// 0 means use a default. -1 means disable refresh.
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
type SendOptions struct {
class rpc.ConnectionClass
metrics *DistSenderMetrics
// dontConsiderConnectionHealth, if set, makes the transport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
dontConsiderConnectionHealth bool
}

// TransportFactory encapsulates all interaction with the RPC
Expand Down Expand Up @@ -109,6 +114,7 @@ func grpcTransportFactoryImpl(
} else {
replicas = replicas[:len(rs)]
}

// We'll map the index of the replica descriptor in its slice to its health.
var health util.FastIntMap
for i, r := range rs {
Expand All @@ -121,9 +127,11 @@ func grpcTransportFactoryImpl(
}
}

// Put known-healthy clients first, while otherwise respecting the existing
// ordering of the replicas.
splitHealthy(replicas, health)
if !opts.dontConsiderConnectionHealth {
// Put known-healthy clients first, while otherwise respecting the existing
// ordering of the replicas.
splitHealthy(replicas, health)
}

*transport = grpcTransport{
opts: opts,
Expand Down

0 comments on commit f092ad9

Please sign in to comment.