Skip to content

Commit

Permalink
kv: cache leaseholder on successful response
Browse files Browse the repository at this point in the history
Whenever a successful response is received from an RPC that we know has
to contact the leaseholder to succeed, update the leaseholder cache.

The immediate motivation for this is to be able to land the preceding
commits, which greatly exacerbated (as in, added a much faster failure
mode to)

```
make stress PKG=./pkg/sql/logictest TESTS=TestPlannerLogic/5node-dist/distsql_interleaved_join
```

However, the change is one we've wanted to make for a while; our caching
and in particular the eviction of leaseholders has been deficient
essentially ever since it was first introduced.

Touches #31068.

Release note: None
  • Loading branch information
tbg committed Oct 9, 2018
1 parent e2cc8d2 commit faea6f4
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 30 deletions.
56 changes: 35 additions & 21 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
// The replicas are assumed to be ordered by preference, with closer
// ones (i.e. expected lowest latency) first.
func (ds *DistSender) sendRPC(
ctx context.Context, rangeID roachpb.RangeID, replicas ReplicaSlice, ba roachpb.BatchRequest,
ctx context.Context,
rangeID roachpb.RangeID,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
leaseHolder leaseHolderInfo,
) (*roachpb.BatchResponse, error) {
if len(replicas) == 0 {
return nil, roachpb.NewSendError(
Expand All @@ -384,7 +388,7 @@ func (ds *DistSender) sendRPC(
tracing.AnnotateTrace()
defer tracing.AnnotateTrace()

return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer, leaseHolder)
}

// CountRanges returns the number of ranges that encompass the given key span.
Expand Down Expand Up @@ -426,6 +430,13 @@ func (ds *DistSender) getDescriptor(
return desc, returnToken, nil
}

type leaseHolderInfo struct {
// needsLeaseHolder is set when the corresponding batch needs to be served by the leaseholder.
needsLeaseHolder bool
// info is the ReplicaInfo belonging to the cached leaseholder, if any.
info ReplicaInfo
}

// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor,
Expand All @@ -435,16 +446,17 @@ func (ds *DistSender) sendSingleRange(

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
var knowLeaseholder bool
var leaseHolder leaseHolderInfo
if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() {
leaseHolder.needsLeaseHolder = true
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
knowLeaseholder = true
leaseHolder.info = replicas[0]
}
}
}
if !knowLeaseholder {
if (leaseHolder.info == ReplicaInfo{}) {
// Rearrange the replicas so that they're ordered in expectation of
// request latency.
var latencyFn LatencyFunc
Expand All @@ -454,7 +466,7 @@ func (ds *DistSender) sendSingleRange(
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
}

br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba)
br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba, leaseHolder)
if err != nil {
log.VErrEvent(ctx, 2, err.Error())
return nil, roachpb.NewError(err)
Expand Down Expand Up @@ -1286,6 +1298,7 @@ func (ds *DistSender) sendToReplicas(
replicas ReplicaSlice,
ba roachpb.BatchRequest,
nodeDialer *nodedialer.Dialer,
leaseHolder leaseHolderInfo,
) (*roachpb.BatchResponse, error) {
var ambiguousError error
var haveCommit bool
Expand Down Expand Up @@ -1353,6 +1366,14 @@ func (ds *DistSender) sendToReplicas(
propagateError := false
switch tErr := br.Error.GetDetail().(type) {
case nil:
// When a request that we know could only succeed on the leaseholder comes
// back as successful, make sure the leaseholder cache reflects this
// replica. In steady state, this should almost never be the case, and so we
// gate the update on whether the response comes from a node that we didn't
// know held the lease.
if leaseHolder.needsLeaseHolder && leaseHolder.info.ReplicaDescriptor != curReplica {
ds.leaseHolderCache.Update(ctx, rangeID, curReplica.StoreID)
}
return br, nil
case *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError:
// These errors are likely to be unique to the replica that reported
Expand All @@ -1362,24 +1383,17 @@ func (ds *DistSender) sendToReplicas(
// our descriptor is outright outdated, but it can also be caused by a
// replica that has just been added but needs a snapshot to be caught up.
//
// Evict this replica from the lease holder cache, if applicable, and try
// the next replica. It is important that we do the latter, for the next
// retry might otherwise try the same replica again (assuming the replica is
// still in the descriptor), looping endlessly until the replica is caught
// up (which may never happen if the target range is dormant).
if tErr.StoreID != 0 {
cachedStoreID, found := ds.leaseHolderCache.Lookup(ctx, tErr.RangeID)
evicting := found && cachedStoreID == tErr.StoreID
if evicting {
log.Eventf(ctx, "evicting leaseholder s%d for r%d after RangeNotFoundError", tErr.StoreID, tErr.RangeID)
ds.leaseHolderCache.Update(ctx, tErr.RangeID, 0 /* evict */)
}

}
// We'll try other replicas which typically gives us the leaseholder, either
// via the NotLeaseHolderError or nil error paths, both of which update the
// leaseholder cache.
case *roachpb.NotLeaseHolderError:
ds.metrics.NotLeaseHolderErrCount.Inc(1)
if lh := tErr.LeaseHolder; lh != nil {
// If the replica we contacted knows the new lease holder, update the cache.
// Update the leaseholder cache. Note that we're going to update this again
// after having successfully contact the leaseholder (assuming we have it in
// the range descriptor), but that's OK - this kind of error should be rare
// but when it happens, we want to populate the cache with something useful
// ASAP without waiting for another round trip.
ds.leaseHolderCache.Update(ctx, rangeID, lh.StoreID)

// If the implicated leaseholder is not a known replica, return a SendError
Expand Down
26 changes: 18 additions & 8 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,14 +541,18 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
_ ReplicaSlice,
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := &roachpb.BatchResponse{}
if first {
reply := &roachpb.BatchResponse{}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{LeaseHolder: &leaseHolder})
first = false
return reply, nil
}
return args.CreateReply(), nil
// Return an error to avoid activating a code path that would
// populate the leaseholder cache from the successful response.
// That's not what this test wants to test.
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}

cfg := DistSenderConfig{
Expand All @@ -563,8 +567,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ds := NewDistSender(cfg, g)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, err := client.SendWrapped(context.Background(), ds, put); err != nil {
t.Errorf("put encountered error: %s", err)
if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Errorf("The command did not retry")
Expand Down Expand Up @@ -660,8 +664,10 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok {
t.Fatalf("expected no lease holder for r1, but got s%d", storeID)
if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); !ok {
t.Fatalf("expected new leaseholder to be cached")
} else if exp := roachpb.StoreID(2); storeID != exp {
t.Fatalf("expected lease holder for r1 to be cached as s%d, but got s%d", exp, storeID)
}
}

Expand Down Expand Up @@ -1326,6 +1332,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
)

seen := map[roachpb.ReplicaID]struct{}{}
var leaseholderStoreID roachpb.StoreID
var ds *DistSender
var testFn simpleSendFn = func(
_ context.Context,
Expand All @@ -1347,6 +1354,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
return br, nil
}
leaseholderStoreID = ba.Replica.StoreID
return br, nil
}
cfg := DistSenderConfig{
Expand All @@ -1363,8 +1371,10 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); found {
t.Fatalf("unexpected cached leaseholder s%d", storeID)
if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); !found {
t.Fatal("expected a cached leaseholder")
} else if storeID != leaseholderStoreID {
t.Fatalf("unexpected cached leaseholder s%d, expected s%d", storeID, leaseholderStoreID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,5 @@ func sendBatch(
TransportFactory: transportFactory,
},
}, nil)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer, leaseHolderInfo{})
}
24 changes: 24 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/explain_analyze_plans
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1,
# Verify that EXPLAIN ANALYZE (DISTSQL) annotates plans with collected
# statistics.

# Verify data placement.
query TTITI colnames
SHOW EXPERIMENTAL_RANGES FROM TABLE kv
----
start_key end_key range_id replicas lease_holder
NULL /1 1 {1} 1
/1 /2 2 {1} 1
/2 /3 3 {2} 2
/3 /4 4 {3} 3
/4 /5 5 {4} 4
/5 NULL 6 {5} 5

# Verify data placement.
query TTITI colnames
SHOW EXPERIMENTAL_RANGES FROM TABLE kw
----
start_key end_key range_id replicas lease_holder
NULL /1 6 {5} 5
/1 /2 7 {1} 1
/2 /3 8 {2} 2
/3 /4 9 {3} 3
/4 /5 10 {4} 4
/5 NULL 11 {5} 5

# This query verifies stat collection for the tableReader, mergeJoiner, and
# aggregator.
query T
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ distinct · · (name)
· table books@primary · ·
· spans ALL · ·

# Verify data placement.
query TTITI colnames
SHOW EXPERIMENTAL_RANGES FROM TABLE books
----
start_key end_key range_id replicas lease_holder
NULL NULL 10 {5} 5

query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT DISTINCT authors.name FROM books AS b1, books2 AS b2, authors WHERE b1.title = b2.title AND authors.book = b1.title AND b1.shelf <> b2.shelf]
----
Expand Down

0 comments on commit faea6f4

Please sign in to comment.