Skip to content

Commit

Permalink
kv: move range lease checks and transfers below latching
Browse files Browse the repository at this point in the history
Needed for #57688.

This commit reworks interactions between range leases and requests, pulling the
consultation of a replica's lease down below the level of latching while keeping
heavy-weight operations like lease acquisitions above the level of latching.
Doing so comes with several benefits, some related specifically to non-blocking
transactions and some more general.

Background

Before discussing the change here, let's discuss how lease checks, lease
acquisitions, lease redirection, and lease transfers currently work. Today,
requests consult a replica's range lease before acquiring latches. If the lease
is good to go, the request proceeds to acquire latches. If the lease is not
currently held by any replica, the lease is acquired (again, above latches)
through a coalesced `RequestLeaseRequest`. If the lease is currently held by a
different replica, the request is redirected to that replica using a
`NotLeaseHolderError`. Finally, if the lease check notices a lease transfer in
progress, the request is optimistically redirected to the prospective new
leaseholder.

This all works, but only because it's been around for so long. Due to the lease
check above latching, we're forced to go to great lengths to get the
synchronization with in-flight requests right, which leads to very subtle logic.
This is most apparent with lease transfers, which properly synchronize with
ongoing requests through a delicate dance with the HLC clock and some serious
"spooky action at a distance". Every request bumps the local HLC clock in
`Store.Send`, then grabs the replica mutex, checks for an ongoing lease
transfer, drops the replica mutex, then evaluates. Lease transfers grab the
replica mutex, grab a clock reading from the local HLC clock, bump the
minLeaseProposedTS to stop using the current lease, drops the replica mutex,
then proposes a new lease using this clock reading as its start time. This works
only because each request bumps the HLC clock _before_ checking the lease, so
the HLC clock can serve as an upper bound on every request that has made it
through the lease check by the time the lease transfer begins.

This structure is inflexible, subtle, and falls over as soon as we try to extend
it.

Motivation

The primary motivation for pulling lease checks and transfers below latching is
that the interaction between requests and lease transfers is incompatible with
future-time operations, a key part of the non-blocking transaction project. This
is because the structure relies on the HLC clock providing an upper bound on the
time of any request served by an outgoing leaseholder, which is attached to
lease transfers to ensure that the new leaseholder does not violate any request
served on the old leaseholder. But this is quickly violated once we start
serving future-time operations, which don't bump the HLC clock.

So we quickly need to look elsewhere for this information. The obvious place to
look for this information is the timestamp cache, which records the upper bound
read time of each key span in a range, even if this upper bound time is
synthetic. If we could scan the timestamp cache and attach the maximum read time
to a lease transfer (through a new field, not as the lease start time), we'd be
good. But this runs into a problem, because if we just read the timestamp cache
under the lease transfer's lock, we can't be sure we didn't miss any in-progress
operations that had passed the lease check previously but had not yet bumped the
timestamp cache. Maybe they are still reading? So the custom locking quickly
runs into problems (I said it was inflexible!).

Solution

The solution here is to stop relying on custom locking for lease transfers by
pulling the lease check below latching and by pulling the determination of the
transfer's start time below latching. This ensures that during a lease transfer,
we don't only block new requests, but we also flush out in-flight requests. This
means that by the time we look at the timestamp cache during the evaluation of a
lease transfer, we know it has already been updated by any request that will be
served under the current lease.

This commit doesn't make the switch from consulting the HLC clock to consulting
the timestamp cache during TransferLease request evaluation, but a future commit
will.

Other benefits

Besides this primary change, a number of other benefits fall out of this
restructuring.

1. we avoid relying on custom synchronization around leases, instead relying
   on more the more general latching mechanism.
2. we more closely aligns `TransferLeaseRequest` and `SubsumeRequest`, which now
   both grab clock readings during evaluation and will both need to forward
   their clock reading by the upper-bound of a range's portion of the timestamp
   cache. It makes sense that these two requests would be very similar, as both
   are responsible for renouncing the current leaseholder's powers and passing
   them elsewhere.
3. we more closely aligns the lease acquisition handling with the handling of
   `MergeInProgressError` by classifying a new `InvalidLeaseError` as a
   "concurrencyRetryError" (see isConcurrencyRetryError). This fits the existing
   structure of: grab latches, check range state, drop latches and wait if
   necessary, retry.
4. in doing so, we fuse the critical section of lease checks and the rest of
   the checks in `checkExecutionCanProceed`. So we grab the replica read lock
   one fewer time in the request path.
5. we move one step closer to a world where we can "ship a portion of the
   timestamp cache" during lease transfers (and range merges) to avoid retry
   errors / transaction aborts on the new leaseholder. This commit will be
   followed up by one that ships a very basic summary of a leaseholder's
   timestamp cache during lease transfers. However, this would now be trivial to
   extend with higher resolution information, given some size limit. Perhaps we
   prioritize the local portion of the timestamp cache to avoid txn aborts?
6. now that leases are checked below latching, we no longer have the potential
   for an arbitrary delay due to latching and waiting on locks between when the
   lease is checked and when a request evaluates, so we no longer need checks
   like [this](https://github.com/cockroachdb/cockroach/blob/7bcb2cef794da56f6993f1b27d5b6a036016242b/pkg/kv/kvserver/replica_write.go#L119).
7. we pull observed timestamp handling a layer down, which will be useful to
   address plumbing comments on #57077.

Other behavioral changes

There are two auxiliary behavioral changes made by this commit that deserve
attention.

The first is that during a lease transfer, operations now block on the outgoing
leaseholder instead of immediately redirecting to the expected next leaseholder.
This has trade-offs. On one hand, this delays redirection, which may make lease
transfers more disruptive to ongoing traffic. On the other, we've seen in the
past that the optimistic redirection is not an absolute win. In many cases, it
can lead to thrashing and lots of wasted work, as the outgoing leaseholder and
the incoming leaseholder both point at each other and requests ping-pong between
them. We've seen this cause serious issues like #22837 and #32367, which we
addressed by adding exponential backoff in the client in 89d349a. So while this
change may make average-case latency during lease transfers slightly worse, it
will keep things much more orderly, avoid wasted work, and reduce worse case
latency during lease transfers.

The other behavioral changes made by this commit is that observed timestamps are
no longer applied to a request to reduce its MaxOffset until after latching and
locking, instead of before. This sounds concerning, but it's actually not for
two reasons. First, as of #57136, a transactions uncertainty interval is no
longer considered by the lock table because locks in a transaction's uncertainty
interval are no longer considered write-read conflicts. Instead, those locks'
provisional values are considered at evaluation time to be uncertain. Second,
the fact that the observed timestamp-limited MaxOffset was being used for
latching is no longer correct in a world with synthetic timestamps (see #57077),
so we would have had to make this change anyway. So put together, this
behavioral change isn't meaningful.
  • Loading branch information
nvanbenschoten committed Feb 5, 2021
1 parent 3c846bb commit c44b357
Show file tree
Hide file tree
Showing 33 changed files with 1,334 additions and 728 deletions.
64 changes: 64 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -120,10 +123,13 @@ func TestLeaseCommandLearnerReplica(t *testing.T) {
}
desc := roachpb.RangeDescriptor{}
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
StoreID: voterStoreID,
Desc: &desc,
Clock: clock,
}).EvalContext(),
Args: &roachpb.TransferLeaseRequest{
Lease: roachpb.Lease{
Expand Down Expand Up @@ -157,6 +163,64 @@ func TestLeaseCommandLearnerReplica(t *testing.T) {
require.EqualError(t, err, expForLearner)
}

// TestLeaseTransferForwardsStartTime tests that during a lease transfer, the
// start time of the new lease is determined during evaluation, after latches
// have granted the lease transfer full mutual exclusion over the leaseholder.
func TestLeaseTransferForwardsStartTime(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "epoch", func(t *testing.T, epoch bool) {
ctx := context.Background()
db := storage.NewDefaultInMem()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()

replicas := []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, Type: roachpb.ReplicaTypeVoterFull(), ReplicaID: 1},
{NodeID: 2, StoreID: 2, Type: roachpb.ReplicaTypeVoterFull(), ReplicaID: 2},
}
desc := roachpb.RangeDescriptor{}
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)

nextLease := roachpb.Lease{
Replica: replicas[1],
Start: clock.NowAsClockTimestamp(),
}
if epoch {
nextLease.Epoch = 1
} else {
exp := nextLease.Start.ToTimestamp().Add(9*time.Second.Nanoseconds(), 0)
nextLease.Expiration = &exp
}
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
StoreID: 1,
Desc: &desc,
Clock: clock,
}).EvalContext(),
Args: &roachpb.TransferLeaseRequest{
Lease: nextLease,
},
}

manual.Increment(1000)
beforeEval := clock.NowAsClockTimestamp()

res, err := TransferLease(ctx, batch, cArgs, nil)
require.NoError(t, err)

// The proposed lease start time should be assigned at eval time.
propLease := res.Replicated.State.Lease
require.NotNil(t, propLease)
require.True(t, nextLease.Start.Less(propLease.Start))
require.True(t, beforeEval.Less(propLease.Start))
})
}

func TestCheckCanReceiveLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,22 @@ func TransferLease(
// LeaseRejectedError before going through Raft.
prevLease, _ := cArgs.EvalCtx.GetLease()

// Forward the lease's start time to a current clock reading. At this
// point, we're holding latches across the entire range, we know that
// this time is greater than the timestamps at which any request was
// serviced by the leaseholder before it stopped serving requests (i.e.
// before the TransferLease request acquired latches).
newLease := args.Lease
newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp())
args.Lease = roachpb.Lease{} // prevent accidental use below

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(args.Lease.Replica, cArgs.EvalCtx.Desc()); err != nil {
if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil {
return newFailedLeaseTrigger(true /* isTransfer */), err
}

log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, args.Lease)
log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease)
return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats,
args.Lease, prevLease, false /* isExtension */, true /* isTransfer */)
newLease, prevLease, false /* isExtension */, true /* isTransfer */)
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func PushTxn(
return result.Result{}, errors.Errorf("request timestamp %s less than pushee txn timestamp %s", h.Timestamp, args.PusheeTxn.WriteTimestamp)
}
now := cArgs.EvalCtx.Clock().Now()
// TODO(nvanbenschoten): remove this limitation. But when doing so,
// keep the h.Timestamp.Less(args.PushTo) check above.
if now.Less(h.Timestamp) {
// The batch's timestamp should have been used to update the clock.
return result.Result{}, errors.Errorf("request timestamp %s less than current clock time %s", h.Timestamp, now)
Expand Down
64 changes: 32 additions & 32 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand Down Expand Up @@ -1373,21 +1372,22 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
}

// Install a hook to observe when a get or a put request for a special key,
// rhsSentinel, acquires latches and begins evaluating.
// rhsSentinel, hits a MergeInProgressError and begins waiting on the merge.
const reqConcurrency = 10
var rhsSentinel roachpb.Key
reqAcquiredLatch := make(chan struct{}, reqConcurrency)
testingLatchFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, r := range ba.Requests {
req := r.GetInner()
switch req.Method() {
case roachpb.Get, roachpb.Put:
if req.Header().Key.Equal(rhsSentinel) {
reqAcquiredLatch <- struct{}{}
reqWaitingOnMerge := make(chan struct{}, reqConcurrency)
testingConcurrencyRetryFilter := func(_ context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error) {
if _, ok := pErr.GetDetail().(*roachpb.MergeInProgressError); ok {
for _, r := range ba.Requests {
req := r.GetInner()
switch req.Method() {
case roachpb.Get, roachpb.Put:
if req.Header().Key.Equal(rhsSentinel) {
reqWaitingOnMerge <- struct{}{}
}
}
}
}
return nil
}

manualClock := hlc.NewHybridManualClock()
Expand All @@ -1401,8 +1401,9 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
ClockSource: manualClock.UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: testingRequestFilter,
TestingLatchFilter: testingLatchFilter,
TestingRequestFilter: testingRequestFilter,
TestingConcurrencyRetryFilter: testingConcurrencyRetryFilter,
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
},
Expand All @@ -1416,6 +1417,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
// during the merge.
lhsDesc, rhsDesc, err := tc.Servers[0].ScratchRangeWithExpirationLeaseEx()
require.NoError(t, err)
rhsSentinel = rhsDesc.StartKey.AsRawKey()

tc.AddVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(1))
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Target(1))
Expand All @@ -1438,26 +1440,26 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
// is aware of the merge and is refusing all traffic, so we can't just send a
// TransferLease request. Instead, we need to expire the second store's lease,
// then acquire the lease on the first store.
toAdvance := store.GetStoreConfig().LeaseExpiration()

// Before doing so, however, ensure that the merge transaction has written
// its transaction record so that it doesn't run into trouble with the low
// water mark of the new leaseholder's timestamp cache. This could result in
// the transaction being inadvertently aborted during its first attempt,
// which this test is not designed to handle. If the merge transaction did
// abort then the get requests could complete on r2 before the merge retried.
hb, hbH := heartbeatArgs(mergeTxn, tc.Servers[0].Clock().Now())
//
// We heartbeat the merge's transaction record with a timestamp forwarded by
// the duration we plan to advance the clock by so that the transaction does
// not look expired even after the manual clock update.
afterAdvance := tc.Servers[0].Clock().Now().Add(toAdvance, 0)
hb, hbH := heartbeatArgs(mergeTxn, afterAdvance)
if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), hbH, hb); pErr != nil {
t.Fatal(pErr)
}

// Turn off liveness heartbeats on the second store, then advance the clock
// past the liveness expiration time. This expires all leases on all stores.
tc.Servers[1].NodeLiveness().(*liveness.NodeLiveness).PauseHeartbeatLoopForTest()
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())

// Manually heartbeat the liveness on the first store to ensure it's
// considered live. The automatic heartbeat might not come for a while.
require.NoError(t, tc.HeartbeatNodeLiveness(0))
// Then increment the clock to expire all leases.
manualClock.Increment(toAdvance)

// Send several get and put requests to the RHS. The first of these to
// arrive will acquire the lease; the remaining requests will wait for that
Expand Down Expand Up @@ -1513,19 +1515,17 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
time.Sleep(time.Millisecond)
}

// Wait for the get and put requests to acquire latches, which is as far as
// they can get while the merge is in progress. Then wait a little bit
// longer. This tests that the requests really do get stuck waiting for the
// merge to complete without depending too heavily on implementation
// details.
// Wait for the get and put requests to begin waiting on the merge to
// complete. Then wait a little bit longer. This tests that the requests
// really do get stuck waiting for the merge to complete without depending
// too heavily on implementation details.
for i := 0; i < reqConcurrency; i++ {
select {
case <-reqAcquiredLatch:
// Latch acquired.
case <-reqWaitingOnMerge:
// Waiting on merge.
case pErr := <-reqErrs:
// Requests may never make it to the latch acquisition if s1 has not
// yet learned s2's lease is expired. Instead, we'll see a
// NotLeaseholderError.
// Requests may never wait on the merge if s1 has not yet learned
// s2's lease is expired. Instead, we'll see a NotLeaseholderError.
require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail())
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
tc := testcluster.StartTestCluster(t, 3, clusterArgs)
defer tc.Stopper().Stop(ctx)

rngDesc, err := tc.Servers[0].ScratchRangeEx()
_, rngDesc, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)
key := rngDesc.StartKey.AsRawKey()
// Add replicas on all the stores.
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3439,6 +3439,12 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Send an arbitrary request to the range to update the range descriptor
// cache with the new lease. This prevents the rollback from getting stuck
// waiting on latches held by txn2's read on the old leaseholder.
_, err = kvDB.Get(ctx, "c")
require.NoError(t, err)

// Roll back txn1.
err = txn1.Rollback(ctx)
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewManager(cfg Config) Manager {
lt: lt,
ltw: &lockTableWaiterImpl{
st: cfg.Settings,
clock: cfg.Clock,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
lt: lt,
Expand Down Expand Up @@ -443,9 +444,10 @@ func (g *Guard) HoldingLatches() bool {
return g != nil && g.lg != nil
}

// AssertLatches asserts that the guard is non-nil and holding latches.
// AssertLatches asserts that the guard is non-nil and holding latches, if the
// request is supposed to hold latches while evaluating in the first place.
func (g *Guard) AssertLatches() {
if !g.HoldingLatches() {
if shouldAcquireLatches(g.Req) && !g.HoldingLatches() {
panic("expected latches held, found none")
}
}
Expand Down
21 changes: 19 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
// debug-latch-manager
// debug-lock-table
// debug-disable-txn-pushes
// debug-set-clock ts=<secs>
// reset
//
func TestConcurrencyManagerBasic(t *testing.T) {
Expand Down Expand Up @@ -119,7 +120,6 @@ func TestConcurrencyManagerBasic(t *testing.T) {
ReadTimestamp: ts,
MaxTimestamp: maxTS,
}
txn.UpdateObservedTimestamp(c.nodeDesc.NodeID, ts.UnsafeToClockTimestamp())
c.registerTxn(txnName, txn)
return ""

Expand Down Expand Up @@ -459,6 +459,17 @@ func TestConcurrencyManagerBasic(t *testing.T) {
c.disableTxnPushes()
return ""

case "debug-set-clock":
var secs int
d.ScanArgs(t, "ts", &secs)

nanos := int64(secs) * time.Second.Nanoseconds()
if nanos < c.manual.UnixNano() {
d.Fatalf(t, "manual clock must advance")
}
c.manual.Set(nanos)
return ""

case "reset":
if n := mon.numMonitored(); n > 0 {
d.Fatalf(t, "%d requests still in flight", n)
Expand Down Expand Up @@ -494,6 +505,8 @@ type cluster struct {
nodeDesc *roachpb.NodeDescriptor
rangeDesc *roachpb.RangeDescriptor
st *clustersettings.Settings
manual *hlc.ManualClock
clock *hlc.Clock
m concurrency.Manager

// Definitions.
Expand Down Expand Up @@ -523,10 +536,13 @@ type txnPush struct {
}

func newCluster() *cluster {
manual := hlc.NewManualClock(123 * time.Second.Nanoseconds())
return &cluster{
st: clustersettings.MakeTestingClusterSettings(),
nodeDesc: &roachpb.NodeDescriptor{NodeID: 1},
rangeDesc: &roachpb.RangeDescriptor{RangeID: 1},
st: clustersettings.MakeTestingClusterSettings(),
manual: manual,
clock: hlc.NewClock(manual.UnixNano, time.Nanosecond),

txnsByName: make(map[string]*roachpb.Transaction),
requestsByName: make(map[string]testReq),
Expand All @@ -541,6 +557,7 @@ func (c *cluster) makeConfig() concurrency.Config {
NodeDesc: c.nodeDesc,
RangeDesc: c.rangeDesc,
Settings: c.st,
Clock: c.clock,
IntentResolver: c,
OnContentionEvent: func(ev *roachpb.ContentionEvent) {
ev.Duration = 1234 * time.Millisecond // for determinism
Expand Down
Loading

0 comments on commit c44b357

Please sign in to comment.