Skip to content

Commit

Permalink
kvclient: stop reads on followers
Browse files Browse the repository at this point in the history
Stop follower reads on draining, decommissioning or unhealthy nodes.

Epic: none
Fixes: cockroachdb#112351

Release note (performance improvement): This change prevents failed
requests from being issued on followers that are draining,
decommissioning or unhealthy which prevents latency spikes if those
nodes later go offline.
  • Loading branch information
andrewbaptist committed Nov 1, 2023
1 parent fb63e6c commit db3a04d
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/physicalplan/replicaoracle",
Expand Down
157 changes: 157 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
Expand Down Expand Up @@ -675,6 +678,7 @@ func TestOracle(t *testing.T) {
Settings: st,
RPCContext: rpcContext,
Clock: clock,
HealthFunc: func(roachpb.NodeID) bool { return true },
})

res, _, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{})
Expand Down Expand Up @@ -1113,3 +1117,156 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
})
}
}

// Test draining a node stops any follower reads to that node. This is important
// because a drained node is about to shut down and a follower read prior to a
// shutdown may need to wait for a gRPC timeout.
func TestDrainStopsFollowerReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
sv := &settings.SV

// Turn down these durations to allow follower reads to happen faster.
closeTime := 10 * time.Millisecond
closedts.TargetDuration.Override(ctx, sv, closeTime)
closedts.SideTransportCloseInterval.Override(ctx, sv, closeTime)
ClosedTimestampPropagationSlack.Override(ctx, sv, closeTime)

// Configure localities so n3 and n4 are in the same locality.
// SQL runs on n4 (west).
// Drain n3 (west).
numNodes := 4
locality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: region},
},
}
}
localities := []roachpb.Locality{
locality("us-east"),
locality("us-east"),
locality("us-west"),
locality("us-west"),
}
manualClock := hlc.NewHybridManualClock()

// Record which store processed the read request for our key.
var lastReader atomic.Int32
recordDestStore := func(args kvserverbase.FilterArgs) *kvpb.Error {
getArg, ok := args.Req.(*kvpb.GetRequest)
if !ok || !keys.ScratchRangeMin.Equal(getArg.Key) {
return nil
}
lastReader.Store(int32(args.Sid))
return nil
}

// Set up the nodes in different locality and use the LatencyFunc to
// simulate latency.
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
i := i
serverArgs[i] = base.TestServerArgs{
Settings: settings,
Locality: localities[i],
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
DisableSQLServer: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: recordDestStore,
},
},
// Currently we use latency as the "primary" signal and use
// locality only if the latency is unavailable. Simulate
// locality based on whether the nodes are in the same locality.
// TODO(baptist): Remove this if we sort replicas by region (#112993).
KVClient: &kvcoord.ClientTestingKnobs{
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if localities[id-1].Equal(localities[i]) {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
},
},
},
}
}

// Set ReplicationManual as we don't want any leases to move around and affect
// the results of this test.
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
},
},
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

// Put the scratch range on nodes 1, 2, 3 and leave the lease on 1.
// We want all follower read request to come from 4 and go to node 3 due to
// the way latency and localities are set up.
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1, 2)...)
server := tc.Server(3).ApplicationLayer()
db := server.DB()

// Keep the read time the same as a time in the recent past. Once the
// readTime is closed, we expect all future reads to go to n3.
readTime := server.Clock().Now()

testutils.SucceedsSoon(t, func() error {
sendFollowerRead(t, db, scratchKey, readTime)
reader := lastReader.Load()
if reader != 3 {
return errors.Newf("expected read to n3 not n%d", reader)
}
return nil
})

// Send a drain request to n3 and wait until the drain is completed. Other
// nodes find out about the drain asynchronously through gossip.
req := serverpb.DrainRequest{Shutdown: false, DoDrain: true, NodeId: "3"}
drainStream, err := tc.Server(0).GetAdminClient(t).Drain(ctx, &req)
require.NoError(t, err)
// When we get a response the drain is complete.
drainResp, err := drainStream.Recv()
require.NoError(t, err)
require.True(t, drainResp.IsDraining)

// Follower reads should stop going to n3 once other nodes notice it
// draining.
testutils.SucceedsSoon(t, func() error {
sendFollowerRead(t, db, scratchKey, readTime)
reader := lastReader.Load()
if reader == 3 {
return errors.New("expected to not read from n3")
}
return nil
})
}

func sendFollowerRead(t *testing.T, db *kv.DB, scratchKey roachpb.Key, readTime hlc.Timestamp) {
// Manually construct the BatchRequest to set the Timestamp.
b := db.NewBatch()
b.Get(scratchKey)
br := kvpb.BatchRequest{
Header: b.Header,
Requests: b.Requests(),
}
br.Header.Timestamp = readTime

_, kvErr := db.NonTransactionalSender().Send(context.Background(), &br)
require.Nil(t, kvErr)
}
27 changes: 25 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// HealthFunc returns true if the node is alive and not draining.
healthFunc atomic.Pointer[HealthFunc]

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// locality is the description of the topography of the server on which the
Expand Down Expand Up @@ -717,14 +720,34 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

// Placeholder function until we inject the real health function in using
// SetHealthFunc.
// TODO(baptist): Restructure the code to allow injecting the correct
// HealthFunc at construction time.
healthFunc := HealthFunc(func(id roachpb.NodeID) bool {
return true
})
ds.healthFunc.Store(&healthFunc)

return ds
}

// SetHealthFunc is called after construction due to the circular dependency
// between DistSender and NodeLiveness.
func (ds *DistSender) SetHealthFunc(healthFn HealthFunc) {
ds.healthFunc.Store(&healthFn)
}

// LatencyFunc returns the LatencyFunc of the DistSender.
func (ds *DistSender) LatencyFunc() LatencyFunc {
return ds.latencyFunc
}

// HealthFunc returns the HealthFunc of the DistSender.
func (ds *DistSender) HealthFunc() HealthFunc {
return *ds.healthFunc.Load()
}

// DisableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the range
// cache.
Expand Down Expand Up @@ -2240,7 +2263,7 @@ func (ds *DistSender) sendToReplicas(
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality)
}

idx := -1
Expand All @@ -2259,7 +2282,7 @@ func (ds *DistSender) sendToReplicas(
case kvpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func newTransportForRange(
if err != nil {
return nil, err
}
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), latencyFn, ds.locality)
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.HealthFunc(), latencyFn, ds.locality)
opts := SendOptions{class: connectionClass(&ds.st.SV)}
return ds.transportFactory(opts, ds.nodeDialer, replicas)
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func localityMatch(a, b []roachpb.Tier) int {
// node and a bool indicating whether the latency is valid.
type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)

// HealthFunc returns true if the node should be considered alive. Unhealthy
// nodes are sorted behind healthy nodes.
type HealthFunc func(roachpb.NodeID) bool

// OptimizeReplicaOrder sorts the replicas in the order in which
// they're to be used for sending RPCs (meaning in the order in which
// they'll be probed for the lease). Lower latency and "closer"
Expand All @@ -205,7 +209,7 @@ type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)
// leaseholder is known by the caller, the caller will move it to the
// front if appropriate.
func (rs ReplicaSlice) OptimizeReplicaOrder(
nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality,
nodeID roachpb.NodeID, healthFn HealthFunc, latencyFn LatencyFunc, locality roachpb.Locality,
) {
// If we don't know which node we're on or its locality, and we don't have
// latency information to other nodes, send the RPCs randomly.
Expand All @@ -220,6 +224,16 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
if rs[i].NodeID == rs[j].NodeID {
return false // i == j
}

// Sort healthy nodes before unhealthy nodes.
// NB: This is dnoe before checking if we are on the local node because
// if we are unhealthy, then we prefer to choose a different follower.
healthI := healthFn(rs[i].NodeID)
healthJ := healthFn(rs[j].NodeID)
if healthI != healthJ {
return healthI
}

// Replicas on the local node sort first.
if rs[i].NodeID == nodeID {
return true // i < j
Expand Down
29 changes: 28 additions & 1 deletion pkg/kv/kvclient/kvcoord/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
locality roachpb.Locality
// map from node address (see nodeDesc()) to latency to that node.
latencies map[roachpb.NodeID]time.Duration
// map of unhealthy nodes
unhealthy map[roachpb.NodeID]struct{}
slice ReplicaSlice
// expOrder is the expected order in which the replicas sort. Replicas are
// only identified by their node. If multiple replicas are on different
Expand All @@ -217,6 +219,24 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
},
expOrdered: []roachpb.NodeID{1, 2, 4, 3},
},
{
// Same test as above, but mark nodes 2 and 4 as unhealthy.
name: "order by health",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
slice: ReplicaSlice{
info(t, 1, 1, []string{"country=us", "region=west", "city=la"}),
info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
info(t, 3, 33, []string{"country=uk", "city=london"}),
info(t, 4, 4, []string{"country=us", "region=east", "city=ny"}),
},
unhealthy: map[roachpb.NodeID]struct{}{
1: {},
4: {},
},
expOrdered: []roachpb.NodeID{2, 3, 1, 4},
},
{
name: "order by latency",
nodeID: 1,
Expand Down Expand Up @@ -266,9 +286,16 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
return lat, ok
}
}
healthFn := func(id roachpb.NodeID) bool {
if test.unhealthy == nil {
return true
}
_, ok := test.unhealthy[id]
return !ok
}
// Randomize the input order, as it's not supposed to matter.
shuffle.Shuffle(test.slice)
test.slice.OptimizeReplicaOrder(test.nodeID, latencyFn, test.locality)
test.slice.OptimizeReplicaOrder(test.nodeID, healthFn, latencyFn, test.locality)
var sortedNodes []roachpb.NodeID
sortedNodes = append(sortedNodes, test.slice[0].NodeID)
for i := 1; i < len(test.slice); i++ {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/liveness/livenesspb/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ const (
NetworkMap
LossOfQuorum
ReplicaGCQueue
DistSender
)

func (nv NodeVitality) IsLive(usage VitalityUsage) bool {
Expand Down Expand Up @@ -270,6 +271,8 @@ func (nv NodeVitality) IsLive(usage VitalityUsage) bool {
return nv.isAlive()
case ReplicaGCQueue:
return nv.isAlive()
case DistSender:
return nv.isAvailableNotDraining()
}

// TODO(baptist): Should be an assertion that we don't know this uasge.
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
Expand Down Expand Up @@ -539,6 +540,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf

nodeRegistry.AddMetricStruct(nodeLiveness.Metrics())

// TODO(baptist): Refactor this to change the dependency between liveness and
// the dist sender. Today the persistence of liveness requires the distsender
// to read and write the liveness records, but the cache only needs the gossip
// struct. We could construct the liveness cache separately from the rest of
// liveness and use that to compute this rather than the entire liveness
// struct.
distSender.SetHealthFunc(func(id roachpb.NodeID) bool {
return nodeLiveness.GetNodeVitalityFromCache(id).IsLive(livenesspb.DistSender)
})

nodeLivenessFn := storepool.MakeStorePoolNodeLivenessFunc(nodeLiveness)
if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok {
if nodeLivenessKnobs.StorePoolNodeLivenessFn != nil {
Expand Down
Loading

0 comments on commit db3a04d

Please sign in to comment.