diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 9d5979faee72..0de0bebe8d40 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -372,7 +372,11 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor { // The replicas are assumed to be ordered by preference, with closer // ones (i.e. expected lowest latency) first. func (ds *DistSender) sendRPC( - ctx context.Context, rangeID roachpb.RangeID, replicas ReplicaSlice, ba roachpb.BatchRequest, + ctx context.Context, + rangeID roachpb.RangeID, + replicas ReplicaSlice, + ba roachpb.BatchRequest, + leaseHolder leaseHolderInfo, ) (*roachpb.BatchResponse, error) { if len(replicas) == 0 { return nil, roachpb.NewSendError( @@ -384,7 +388,7 @@ func (ds *DistSender) sendRPC( tracing.AnnotateTrace() defer tracing.AnnotateTrace() - return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer) + return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer, leaseHolder) } // CountRanges returns the number of ranges that encompass the given key span. @@ -426,6 +430,13 @@ func (ds *DistSender) getDescriptor( return desc, returnToken, nil } +type leaseHolderInfo struct { + // needsLeaseHolder is set when the corresponding batch needs to be served by the leaseholder. + needsLeaseHolder bool + // info is the ReplicaInfo belonging to the cached leaseholder, if any. + info ReplicaInfo +} + // sendSingleRange gathers and rearranges the replicas, and makes an RPC call. func (ds *DistSender) sendSingleRange( ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor, @@ -435,16 +446,17 @@ func (ds *DistSender) sendSingleRange( // If this request needs to go to a lease holder and we know who that is, move // it to the front. - var knowLeaseholder bool + var leaseHolder leaseHolderInfo if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() { + leaseHolder.needsLeaseHolder = true if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok { if i := replicas.FindReplica(storeID); i >= 0 { replicas.MoveToFront(i) - knowLeaseholder = true + leaseHolder.info = replicas[0] } } } - if !knowLeaseholder { + if (leaseHolder.info == ReplicaInfo{}) { // Rearrange the replicas so that they're ordered in expectation of // request latency. var latencyFn LatencyFunc @@ -454,7 +466,7 @@ func (ds *DistSender) sendSingleRange( replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) } - br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba) + br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba, leaseHolder) if err != nil { log.VErrEvent(ctx, 2, err.Error()) return nil, roachpb.NewError(err) @@ -1286,6 +1298,7 @@ func (ds *DistSender) sendToReplicas( replicas ReplicaSlice, ba roachpb.BatchRequest, nodeDialer *nodedialer.Dialer, + leaseHolder leaseHolderInfo, ) (*roachpb.BatchResponse, error) { var ambiguousError error var haveCommit bool @@ -1353,6 +1366,14 @@ func (ds *DistSender) sendToReplicas( propagateError := false switch tErr := br.Error.GetDetail().(type) { case nil: + // When a request that we know could only succeed on the leaseholder comes + // back as successful, make sure the leaseholder cache reflects this + // replica. In steady state, this should almost never be the case, and so we + // gate the update on whether the response comes from a node that we didn't + // know held the lease. + if leaseHolder.needsLeaseHolder && leaseHolder.info.ReplicaDescriptor != curReplica { + ds.leaseHolderCache.Update(ctx, rangeID, curReplica.StoreID) + } return br, nil case *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError: // These errors are likely to be unique to the replica that reported @@ -1362,24 +1383,17 @@ func (ds *DistSender) sendToReplicas( // our descriptor is outright outdated, but it can also be caused by a // replica that has just been added but needs a snapshot to be caught up. // - // Evict this replica from the lease holder cache, if applicable, and try - // the next replica. It is important that we do the latter, for the next - // retry might otherwise try the same replica again (assuming the replica is - // still in the descriptor), looping endlessly until the replica is caught - // up (which may never happen if the target range is dormant). - if tErr.StoreID != 0 { - cachedStoreID, found := ds.leaseHolderCache.Lookup(ctx, tErr.RangeID) - evicting := found && cachedStoreID == tErr.StoreID - if evicting { - log.Eventf(ctx, "evicting leaseholder s%d for r%d after RangeNotFoundError", tErr.StoreID, tErr.RangeID) - ds.leaseHolderCache.Update(ctx, tErr.RangeID, 0 /* evict */) - } - - } + // We'll try other replicas which typically gives us the leaseholder, either + // via the NotLeaseHolderError or nil error paths, both of which update the + // leaseholder cache. case *roachpb.NotLeaseHolderError: ds.metrics.NotLeaseHolderErrCount.Inc(1) if lh := tErr.LeaseHolder; lh != nil { - // If the replica we contacted knows the new lease holder, update the cache. + // Update the leaseholder cache. Note that we're going to update this again + // after having successfully contact the leaseholder (assuming we have it in + // the range descriptor), but that's OK - this kind of error should be rare + // but when it happens, we want to populate the cache with something useful + // ASAP without waiting for another round trip. ds.leaseHolderCache.Update(ctx, rangeID, lh.StoreID) // If the implicated leaseholder is not a known replica, return a SendError diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index ae9e3eec2028..890c9ba35356 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -541,14 +541,18 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { _ ReplicaSlice, args roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { + reply := &roachpb.BatchResponse{} if first { - reply := &roachpb.BatchResponse{} reply.Error = roachpb.NewError( &roachpb.NotLeaseHolderError{LeaseHolder: &leaseHolder}) first = false return reply, nil } - return args.CreateReply(), nil + // Return an error to avoid activating a code path that would + // populate the leaseholder cache from the successful response. + // That's not what this test wants to test. + reply.Error = roachpb.NewErrorf("boom") + return reply, nil } cfg := DistSenderConfig{ @@ -563,8 +567,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { ds := NewDistSender(cfg, g) v := roachpb.MakeValueFromString("value") put := roachpb.NewPut(roachpb.Key("a"), v) - if _, err := client.SendWrapped(context.Background(), ds, put); err != nil { - t.Errorf("put encountered error: %s", err) + if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") { + t.Fatalf("unexpected error: %v", pErr) } if first { t.Errorf("The command did not retry") @@ -660,8 +664,10 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2) } - if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok { - t.Fatalf("expected no lease holder for r1, but got s%d", storeID) + if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); !ok { + t.Fatalf("expected new leaseholder to be cached") + } else if exp := roachpb.StoreID(2); storeID != exp { + t.Fatalf("expected lease holder for r1 to be cached as s%d, but got s%d", exp, storeID) } } @@ -1326,6 +1332,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { ) seen := map[roachpb.ReplicaID]struct{}{} + var leaseholderStoreID roachpb.StoreID var ds *DistSender var testFn simpleSendFn = func( _ context.Context, @@ -1347,6 +1354,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID)) return br, nil } + leaseholderStoreID = ba.Replica.StoreID return br, nil } cfg := DistSenderConfig{ @@ -1363,8 +1371,10 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { if err != nil { t.Fatal(err) } - if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); found { - t.Fatalf("unexpected cached leaseholder s%d", storeID) + if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); !found { + t.Fatal("expected a cached leaseholder") + } else if storeID != leaseholderStoreID { + t.Fatalf("unexpected cached leaseholder s%d, expected s%d", storeID, leaseholderStoreID) } } diff --git a/pkg/kv/send_test.go b/pkg/kv/send_test.go index 3b3c4d2c3d01..28f7cc0f635b 100644 --- a/pkg/kv/send_test.go +++ b/pkg/kv/send_test.go @@ -281,5 +281,5 @@ func sendBatch( TransportFactory: transportFactory, }, }, nil) - return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer) + return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer, leaseHolderInfo{}) } diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans index 05a9901b8ed4..8847b10f0720 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans @@ -35,6 +35,30 @@ ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, # Verify that EXPLAIN ANALYZE (DISTSQL) annotates plans with collected # statistics. +# Verify data placement. +query TTITI colnames +SHOW EXPERIMENTAL_RANGES FROM TABLE kv +---- +start_key end_key range_id replicas lease_holder +NULL /1 1 {1} 1 +/1 /2 2 {1} 1 +/2 /3 3 {2} 2 +/3 /4 4 {3} 3 +/4 /5 5 {4} 4 +/5 NULL 6 {5} 5 + +# Verify data placement. +query TTITI colnames +SHOW EXPERIMENTAL_RANGES FROM TABLE kw +---- +start_key end_key range_id replicas lease_holder +NULL /1 6 {5} 5 +/1 /2 7 {1} 1 +/2 /3 8 {2} 2 +/3 /4 9 {3} 3 +/4 /5 10 {4} 4 +/5 NULL 11 {5} 5 + # This query verifies stat collection for the tableReader, mergeJoiner, and # aggregator. query T diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join index f677354b6abe..ed6fe275f31a 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join @@ -211,6 +211,13 @@ distinct · · (name) · table books@primary · · · spans ALL · · +# Verify data placement. +query TTITI colnames +SHOW EXPERIMENTAL_RANGES FROM TABLE books +---- +start_key end_key range_id replicas lease_holder +NULL NULL 10 {5} 5 + query T SELECT url FROM [EXPLAIN (DISTSQL) SELECT DISTINCT authors.name FROM books AS b1, books2 AS b2, authors WHERE b1.title = b2.title AND authors.book = b1.title AND b1.shelf <> b2.shelf] ----