Skip to content

Commit

Permalink
kv: cache leaseholder on successful response
Browse files Browse the repository at this point in the history
Whenever a successful response is received from an RPC that we know has
to contact the leaseholder to succeed, update the leaseholder cache.

The immediate motivation for this is to be able to land the preceding
commits, which greatly exacerbated (as in, added a much faster failure
mode to)

```
make stress PKG=./pkg/sql/logictest TESTS=TestPlannerLogic/5node-dist/distsql_interleaved_join
```

However, the change is one we've wanted to make for a while; our caching
and in particular the eviction of leaseholders has been deficient
essentially ever since it was first introduced.

Touches #31068.

Release note: None
  • Loading branch information
tbg committed Oct 12, 2018
1 parent 6aef2e4 commit fdca0bd
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 32 deletions.
60 changes: 37 additions & 23 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
// The replicas are assumed to be ordered by preference, with closer
// ones (i.e. expected lowest latency) first.
func (ds *DistSender) sendRPC(
ctx context.Context, rangeID roachpb.RangeID, replicas ReplicaSlice, ba roachpb.BatchRequest,
ctx context.Context,
rangeID roachpb.RangeID,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
cachedLeaseHolder roachpb.ReplicaDescriptor,
) (*roachpb.BatchResponse, error) {
if len(replicas) == 0 {
return nil, roachpb.NewSendError(
Expand All @@ -384,7 +388,15 @@ func (ds *DistSender) sendRPC(
tracing.AnnotateTrace()
defer tracing.AnnotateTrace()

return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer)
return ds.sendToReplicas(
ctx,
SendOptions{metrics: &ds.metrics},
rangeID,
replicas,
ba,
ds.nodeDialer,
cachedLeaseHolder,
)
}

// CountRanges returns the number of ranges that encompass the given key span.
Expand Down Expand Up @@ -435,16 +447,16 @@ func (ds *DistSender) sendSingleRange(

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
var knowLeaseholder bool
if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() {
var cachedLeaseHolder roachpb.ReplicaDescriptor
if ba.RequiresLeaseHolder() {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
knowLeaseholder = true
cachedLeaseHolder = replicas[0].ReplicaDescriptor
}
}
}
if !knowLeaseholder {
if (cachedLeaseHolder == roachpb.ReplicaDescriptor{}) {
// Rearrange the replicas so that they're ordered in expectation of
// request latency.
var latencyFn LatencyFunc
Expand All @@ -454,7 +466,7 @@ func (ds *DistSender) sendSingleRange(
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
}

br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba)
br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba, cachedLeaseHolder)
if err != nil {
log.VErrEvent(ctx, 2, err.Error())
return nil, roachpb.NewError(err)
Expand Down Expand Up @@ -1104,7 +1116,7 @@ func (ds *DistSender) sendPartialBatch(
// row and the range descriptor hasn't changed, return the error
// to our caller.
switch tErr := pErr.GetDetail().(type) {
case *roachpb.SendError, *roachpb.RangeNotFoundError:
case *roachpb.SendError:
// We've tried all the replicas without success. Either
// they're all down, or we're using an out-of-date range
// descriptor. Invalidate the cache and try again with the new
Expand Down Expand Up @@ -1281,6 +1293,7 @@ func (ds *DistSender) sendToReplicas(
replicas ReplicaSlice,
ba roachpb.BatchRequest,
nodeDialer *nodedialer.Dialer,
cachedLeaseHolder roachpb.ReplicaDescriptor,
) (*roachpb.BatchResponse, error) {
var ambiguousError error
var haveCommit bool
Expand Down Expand Up @@ -1348,6 +1361,14 @@ func (ds *DistSender) sendToReplicas(
propagateError := false
switch tErr := br.Error.GetDetail().(type) {
case nil:
// When a request that we know could only succeed on the leaseholder comes
// back as successful, make sure the leaseholder cache reflects this
// replica. In steady state, this is almost always the case, and so we
// gate the update on whether the response comes from a node that we didn't
// know held the lease.
if cachedLeaseHolder != curReplica && ba.RequiresLeaseHolder() {
ds.leaseHolderCache.Update(ctx, rangeID, curReplica.StoreID)
}
return br, nil
case *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError:
// These errors are likely to be unique to the replica that reported
Expand All @@ -1357,25 +1378,18 @@ func (ds *DistSender) sendToReplicas(
// our descriptor is outright outdated, but it can also be caused by a
// replica that has just been added but needs a snapshot to be caught up.
//
// Evict this replica from the lease holder cache, if applicable, and try
// the next replica. It is important that we do the latter, for the next
// retry might otherwise try the same replica again (assuming the replica is
// still in the descriptor), looping endlessly until the replica is caught
// up (which may never happen if the target range is dormant).
if tErr.StoreID != 0 {
cachedStoreID, found := ds.leaseHolderCache.Lookup(ctx, tErr.RangeID)
evicting := found && cachedStoreID == tErr.StoreID
if evicting {
log.Eventf(ctx, "evicting leaseholder s%d for r%d after RangeNotFoundError", tErr.StoreID, tErr.RangeID)
ds.leaseHolderCache.Update(ctx, tErr.RangeID, 0 /* evict */)
}

}
// We'll try other replicas which typically gives us the leaseholder, either
// via the NotLeaseHolderError or nil error paths, both of which update the
// leaseholder cache.
case *roachpb.NotLeaseHolderError:
ds.metrics.NotLeaseHolderErrCount.Inc(1)
if lh := tErr.LeaseHolder; lh != nil {
// If the replica we contacted knows the new lease holder, update the cache.
// Update the leaseholder cache. Naively this would also happen when the
// next RPC comes back, but we don't want to wait out the additional RPC
// latency.
ds.leaseHolderCache.Update(ctx, rangeID, lh.StoreID)
// Avoid an extra update to the leaseholder cache if the next RPC succeeds.
cachedLeaseHolder = *lh

// If the implicated leaseholder is not a known replica, return a SendError
// to signal eviction of the cached RangeDescriptor and re-send.
Expand Down
26 changes: 18 additions & 8 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,18 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
_ ReplicaSlice,
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := &roachpb.BatchResponse{}
if first {
reply := &roachpb.BatchResponse{}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{LeaseHolder: &leaseHolder})
first = false
return reply, nil
}
return args.CreateReply(), nil
// Return an error to avoid activating a code path that would
// populate the leaseholder cache from the successful response.
// That's not what this test wants to test.
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}

cfg := DistSenderConfig{
Expand All @@ -560,8 +564,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ds := NewDistSender(cfg, g)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, err := client.SendWrapped(context.Background(), ds, put); err != nil {
t.Errorf("put encountered error: %s", err)
if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Errorf("The command did not retry")
Expand Down Expand Up @@ -656,8 +660,10 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok {
t.Fatalf("expected no lease holder for r1, but got s%d", storeID)
if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); !ok {
t.Fatalf("expected new leaseholder to be cached")
} else if exp := roachpb.StoreID(2); storeID != exp {
t.Fatalf("expected lease holder for r1 to be cached as s%d, but got s%d", exp, storeID)
}
}

Expand Down Expand Up @@ -1315,6 +1321,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
)

seen := map[roachpb.ReplicaID]struct{}{}
var leaseholderStoreID roachpb.StoreID
var ds *DistSender
var testFn simpleSendFn = func(
_ context.Context,
Expand All @@ -1336,6 +1343,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
return br, nil
}
leaseholderStoreID = ba.Replica.StoreID
return br, nil
}
cfg := DistSenderConfig{
Expand All @@ -1352,8 +1360,10 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); found {
t.Fatalf("unexpected cached leaseholder s%d", storeID)
if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); !found {
t.Fatal("expected a cached leaseholder")
} else if storeID != leaseholderStoreID {
t.Fatalf("unexpected cached leaseholder s%d, expected s%d", storeID, leaseholderStoreID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,5 @@ func sendBatch(
TransportFactory: transportFactory,
},
}, nil)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer, roachpb.ReplicaDescriptor{})
}
7 changes: 7 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (ba *BatchRequest) IsReadOnly() bool {
return len(ba.Requests) > 0 && !ba.hasFlag(isWrite|isAdmin)
}

// RequiresLeaseHolder returns true if the request can only be served by the
// leaseholders of the ranges it addresses.
func (ba *BatchRequest) RequiresLeaseHolder() bool {
return !ba.IsReadOnly() || ba.Header.ReadConsistency.RequiresReadLease()

}

// IsReverse returns true iff the BatchRequest contains a reverse request.
func (ba *BatchRequest) IsReverse() bool {
return ba.hasFlag(isReverse)
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/explain_analyze_plans
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1,
# Verify that EXPLAIN ANALYZE (DISTSQL) annotates plans with collected
# statistics.

# Verify data placement.
query TTITI colnames
SHOW EXPERIMENTAL_RANGES FROM TABLE kv
----
start_key end_key range_id replicas lease_holder
NULL /1 1 {1} 1
/1 /2 2 {1} 1
/2 /3 3 {2} 2
/3 /4 4 {3} 3
/4 /5 5 {4} 4
/5 NULL 6 {5} 5

# Verify data placement.
query TTITI colnames
SHOW EXPERIMENTAL_RANGES FROM TABLE kw
----
start_key end_key range_id replicas lease_holder
NULL /1 6 {5} 5
/1 /2 7 {1} 1
/2 /3 8 {2} 2
/3 /4 9 {3} 3
/4 /5 10 {4} 4
/5 NULL 11 {5} 5

# This query verifies stat collection for the tableReader, mergeJoiner, and
# aggregator.
query T
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ distinct · · (name)
· table books@primary · ·
· spans ALL · ·

# Verify data placement.
query TTITI colnames
SHOW EXPERIMENTAL_RANGES FROM TABLE books
----
start_key end_key range_id replicas lease_holder
NULL NULL 10 {5} 5

query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT DISTINCT authors.name FROM books AS b1, books2 AS b2, authors WHERE b1.title = b2.title AND authors.book = b1.title AND b1.shelf <> b2.shelf]
----
Expand Down

0 comments on commit fdca0bd

Please sign in to comment.