diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index 5b39b39ca13c..dcfd6c37957a 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -57,11 +57,13 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/server/serverpb", "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/physicalplan/replicaoracle", diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index e57aa01346cb..f3cc029adbb4 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -28,8 +28,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" @@ -675,6 +678,7 @@ func TestOracle(t *testing.T) { Settings: st, RPCContext: rpcContext, Clock: clock, + HealthFunc: func(roachpb.NodeID) bool { return true }, }) res, _, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{}) @@ -1113,3 +1117,156 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { }) } } + +// Test draining a node stops any follower reads to that node. This is important +// because a drained node is about to shut down and a follower read prior to a +// shutdown may need to wait for a gRPC timeout. +func TestDrainStopsFollowerReads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer utilccl.TestingEnableEnterprise()() + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + + // Turn down these durations to allow follower reads to happen faster. + closeTime := 10 * time.Millisecond + closedts.TargetDuration.Override(ctx, sv, closeTime) + closedts.SideTransportCloseInterval.Override(ctx, sv, closeTime) + ClosedTimestampPropagationSlack.Override(ctx, sv, closeTime) + + // Configure localities so n3 and n4 are in the same locality. + // SQL runs on n4 (west). + // Drain n3 (west). + numNodes := 4 + locality := func(region string) roachpb.Locality { + return roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: region}, + }, + } + } + localities := []roachpb.Locality{ + locality("us-east"), + locality("us-east"), + locality("us-west"), + locality("us-west"), + } + manualClock := hlc.NewHybridManualClock() + + // Record which store processed the read request for our key. + var lastReader atomic.Int32 + recordDestStore := func(args kvserverbase.FilterArgs) *kvpb.Error { + getArg, ok := args.Req.(*kvpb.GetRequest) + if !ok || !keys.ScratchRangeMin.Equal(getArg.Key) { + return nil + } + lastReader.Store(int32(args.Sid)) + return nil + } + + // Set up the nodes in different locality and use the LatencyFunc to + // simulate latency. + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numNodes; i++ { + i := i + serverArgs[i] = base.TestServerArgs{ + Settings: settings, + Locality: localities[i], + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + DisableSQLServer: true, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: recordDestStore, + }, + }, + // Currently we use latency as the "primary" signal and use + // locality only if the latency is unavailable. Simulate + // locality based on whether the nodes are in the same locality. + // TODO(baptist): Remove this if we sort replicas by region (#112993). + KVClient: &kvcoord.ClientTestingKnobs{ + LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) { + if localities[id-1].Equal(localities[i]) { + return time.Millisecond, true + } + return 100 * time.Millisecond, true + }, + }, + }, + } + } + + // Set ReplicationManual as we don't want any leases to move around and affect + // the results of this test. + tc := testcluster.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + }, + }, + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + // Put the scratch range on nodes 1, 2, 3 and leave the lease on 1. + // We want all follower read request to come from 4 and go to node 3 due to + // the way latency and localities are set up. + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1, 2)...) + server := tc.Server(3).ApplicationLayer() + db := server.DB() + + // Keep the read time the same as a time in the recent past. Once the + // readTime is closed, we expect all future reads to go to n3. + readTime := server.Clock().Now() + + testutils.SucceedsSoon(t, func() error { + sendFollowerRead(t, db, scratchKey, readTime) + reader := lastReader.Load() + if reader != 3 { + return errors.Newf("expected read to n3 not n%d", reader) + } + return nil + }) + + // Send a drain request to n3 and wait until the drain is completed. Other + // nodes find out about the drain asynchronously through gossip. + req := serverpb.DrainRequest{Shutdown: false, DoDrain: true, NodeId: "3"} + drainStream, err := tc.Server(0).GetAdminClient(t).Drain(ctx, &req) + require.NoError(t, err) + // When we get a response the drain is complete. + drainResp, err := drainStream.Recv() + require.NoError(t, err) + require.True(t, drainResp.IsDraining) + + // Follower reads should stop going to n3 once other nodes notice it + // draining. + testutils.SucceedsSoon(t, func() error { + sendFollowerRead(t, db, scratchKey, readTime) + reader := lastReader.Load() + if reader == 3 { + return errors.New("expected to not read from n3") + } + return nil + }) +} + +func sendFollowerRead(t *testing.T, db *kv.DB, scratchKey roachpb.Key, readTime hlc.Timestamp) { + // Manually construct the BatchRequest to set the Timestamp. + b := db.NewBatch() + b.Get(scratchKey) + br := kvpb.BatchRequest{ + Header: b.Header, + Requests: b.Requests(), + } + br.Header.Timestamp = readTime + + _, kvErr := db.NonTransactionalSender().Send(context.Background(), &br) + require.Nil(t, kvErr) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 392b917482f6..e95b366fbf8c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -542,6 +542,9 @@ type DistSender struct { // LatencyFunc is used to estimate the latency to other nodes. latencyFunc LatencyFunc + // HealthFunc returns true if the node is alive and not draining. + healthFunc atomic.Pointer[HealthFunc] + onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error // locality is the description of the topography of the server on which the @@ -717,14 +720,34 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch } + // Placeholder function until we inject the real health function in using + // SetHealthFunc. + // TODO(baptist): Restructure the code to allow injecting the correct + // HealthFunc at construction time. + healthFunc := HealthFunc(func(id roachpb.NodeID) bool { + return true + }) + ds.healthFunc.Store(&healthFunc) + return ds } +// SetHealthFunc is called after construction due to the circular dependency +// between DistSender and NodeLiveness. +func (ds *DistSender) SetHealthFunc(healthFn HealthFunc) { + ds.healthFunc.Store(&healthFn) +} + // LatencyFunc returns the LatencyFunc of the DistSender. func (ds *DistSender) LatencyFunc() LatencyFunc { return ds.latencyFunc } +// HealthFunc returns the HealthFunc of the DistSender. +func (ds *DistSender) HealthFunc() HealthFunc { + return *ds.healthFunc.Load() +} + // DisableFirstRangeUpdates disables updates of the first range via // gossip. Used by tests which want finer control of the contents of the range // cache. @@ -2240,7 +2263,7 @@ func (ds *DistSender) sendToReplicas( // First order by latency, then move the leaseholder to the front of the // list, if it is known. if !ds.dontReorderReplicas { - replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality) + replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality) } idx := -1 @@ -2259,7 +2282,7 @@ func (ds *DistSender) sendToReplicas( case kvpb.RoutingPolicy_NEAREST: // Order by latency. log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") - replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality) + replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality) default: log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 97ee1c85e7a9..818b7d9e3a1c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -720,7 +720,7 @@ func newTransportForRange( if err != nil { return nil, err } - replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), latencyFn, ds.locality) + replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.HealthFunc(), latencyFn, ds.locality) opts := SendOptions{class: connectionClass(&ds.st.SV)} return ds.transportFactory(opts, ds.nodeDialer, replicas) } diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 01264a4bab13..783be89f04ae 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -189,6 +189,10 @@ func localityMatch(a, b []roachpb.Tier) int { // node and a bool indicating whether the latency is valid. type LatencyFunc func(roachpb.NodeID) (time.Duration, bool) +// HealthFunc returns true if the node should be considered alive. Unhealthy +// nodes are sorted behind healthy nodes. +type HealthFunc func(roachpb.NodeID) bool + // OptimizeReplicaOrder sorts the replicas in the order in which // they're to be used for sending RPCs (meaning in the order in which // they'll be probed for the lease). Lower latency and "closer" @@ -205,7 +209,7 @@ type LatencyFunc func(roachpb.NodeID) (time.Duration, bool) // leaseholder is known by the caller, the caller will move it to the // front if appropriate. func (rs ReplicaSlice) OptimizeReplicaOrder( - nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality, + nodeID roachpb.NodeID, healthFn HealthFunc, latencyFn LatencyFunc, locality roachpb.Locality, ) { // If we don't know which node we're on or its locality, and we don't have // latency information to other nodes, send the RPCs randomly. @@ -220,6 +224,16 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( if rs[i].NodeID == rs[j].NodeID { return false // i == j } + + // Sort healthy nodes before unhealthy nodes. + // NB: This is dnoe before checking if we are on the local node because + // if we are unhealthy, then we prefer to choose a different follower. + healthI := healthFn(rs[i].NodeID) + healthJ := healthFn(rs[j].NodeID) + if healthI != healthJ { + return healthI + } + // Replicas on the local node sort first. if rs[i].NodeID == nodeID { return true // i < j diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index 1c3329871ce9..014f63b36c7e 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -197,6 +197,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { locality roachpb.Locality // map from node address (see nodeDesc()) to latency to that node. latencies map[roachpb.NodeID]time.Duration + // map of unhealthy nodes + unhealthy map[roachpb.NodeID]struct{} slice ReplicaSlice // expOrder is the expected order in which the replicas sort. Replicas are // only identified by their node. If multiple replicas are on different @@ -217,6 +219,24 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { }, expOrdered: []roachpb.NodeID{1, 2, 4, 3}, }, + { + // Same test as above, but mark nodes 2 and 4 as unhealthy. + name: "order by health", + nodeID: 1, + locality: locality(t, []string{"country=us", "region=west", "city=la"}), + slice: ReplicaSlice{ + info(t, 1, 1, []string{"country=us", "region=west", "city=la"}), + info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}), + info(t, 3, 3, []string{"country=uk", "city=london"}), + info(t, 3, 33, []string{"country=uk", "city=london"}), + info(t, 4, 4, []string{"country=us", "region=east", "city=ny"}), + }, + unhealthy: map[roachpb.NodeID]struct{}{ + 1: {}, + 4: {}, + }, + expOrdered: []roachpb.NodeID{2, 3, 1, 4}, + }, { name: "order by latency", nodeID: 1, @@ -266,9 +286,16 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { return lat, ok } } + healthFn := func(id roachpb.NodeID) bool { + if test.unhealthy == nil { + return true + } + _, ok := test.unhealthy[id] + return !ok + } // Randomize the input order, as it's not supposed to matter. shuffle.Shuffle(test.slice) - test.slice.OptimizeReplicaOrder(test.nodeID, latencyFn, test.locality) + test.slice.OptimizeReplicaOrder(test.nodeID, healthFn, latencyFn, test.locality) var sortedNodes []roachpb.NodeID sortedNodes = append(sortedNodes, test.slice[0].NodeID) for i := 1; i < len(test.slice); i++ { diff --git a/pkg/kv/kvserver/liveness/livenesspb/liveness.go b/pkg/kv/kvserver/liveness/livenesspb/liveness.go index b07f39b13868..dc3bcf892ec6 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/liveness.go +++ b/pkg/kv/kvserver/liveness/livenesspb/liveness.go @@ -211,6 +211,7 @@ const ( NetworkMap LossOfQuorum ReplicaGCQueue + DistSender ) func (nv NodeVitality) IsLive(usage VitalityUsage) bool { @@ -270,6 +271,8 @@ func (nv NodeVitality) IsLive(usage VitalityUsage) bool { return nv.isAlive() case ReplicaGCQueue: return nv.isAlive() + case DistSender: + return nv.isAvailableNotDraining() } // TODO(baptist): Should be an assertion that we don't know this uasge. diff --git a/pkg/server/server.go b/pkg/server/server.go index 8549bdf8b7e2..207b73c96dad 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" @@ -539,6 +540,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf nodeRegistry.AddMetricStruct(nodeLiveness.Metrics()) + // TODO(baptist): Refactor this to change the dependency between liveness and + // the dist sender. Today the persistence of liveness requires the distsender + // to read and write the liveness records, but the cache only needs the gossip + // struct. We could construct the liveness cache separately from the rest of + // liveness and use that to compute this rather than the entire liveness + // struct. + distSender.SetHealthFunc(func(id roachpb.NodeID) bool { + return nodeLiveness.GetNodeVitalityFromCache(id).IsLive(livenesspb.DistSender) + }) + nodeLivenessFn := storepool.MakeStorePoolNodeLivenessFunc(nodeLiveness) if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok { if nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 1ce79c504288..ff4070923f4b 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -52,6 +52,7 @@ type Config struct { Clock *hlc.Clock RPCContext *rpc.Context LatencyFunc kvcoord.LatencyFunc + HealthFunc kvcoord.HealthFunc } // Oracle is used to choose the lease holder for ranges. This @@ -172,6 +173,7 @@ type closestOracle struct { // inside the same process. nodeID roachpb.NodeID locality roachpb.Locality + healthFunc kvcoord.HealthFunc latencyFunc kvcoord.LatencyFunc } @@ -185,6 +187,7 @@ func newClosestOracle(cfg Config) Oracle { nodeID: cfg.NodeID, locality: cfg.Locality, latencyFunc: latencyFn, + healthFunc: cfg.HealthFunc, } } @@ -202,7 +205,7 @@ func (o *closestOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, false, err } - replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality) + replicas.OptimizeReplicaOrder(o.nodeID, o.healthFunc, o.latencyFunc, o.locality) repl := replicas[0].ReplicaDescriptor // There are no "misplanned" ranges if we know the leaseholder, and we're // deliberately choosing non-leaseholder. @@ -237,6 +240,7 @@ type binPackingOracle struct { nodeID roachpb.NodeID locality roachpb.Locality latencyFunc kvcoord.LatencyFunc + healthFunc kvcoord.HealthFunc } func newBinPackingOracle(cfg Config) Oracle { @@ -246,6 +250,7 @@ func newBinPackingOracle(cfg Config) Oracle { nodeID: cfg.NodeID, locality: cfg.Locality, latencyFunc: latencyFunc(cfg.RPCContext), + healthFunc: cfg.HealthFunc, } } @@ -266,7 +271,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, false, err } - replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality) + replicas.OptimizeReplicaOrder(o.nodeID, o.healthFunc, o.latencyFunc, o.locality) // Look for a replica that has been assigned some ranges, but it's not yet full. minLoad := int(math.MaxInt32) diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index a461669fc371..4333a4b959cb 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -43,9 +43,10 @@ func TestClosest(t *testing.T) { nd2, err := g.GetNodeDescriptor(2) require.NoError(t, err) o := NewOracle(ClosestChoice, Config{ - NodeDescs: g, - NodeID: 1, - Locality: nd2.Locality, // pretend node 2 is closest. + NodeDescs: g, + NodeID: 1, + Locality: nd2.Locality, // pretend node 2 is closest. + HealthFunc: func(_ roachpb.NodeID) bool { return true }, }) o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) { if id == 2 { diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 4d1208372c84..dfecb4a74916 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -154,6 +154,7 @@ func NewSpanResolver( Clock: clock, RPCContext: rpcCtx, LatencyFunc: distSender.LatencyFunc(), + HealthFunc: distSender.HealthFunc(), }), distSender: distSender, }