Skip to content

Commit

Permalink
kv: redirect follower reads to leaseholder on contention
Browse files Browse the repository at this point in the history
Fixes #57686.

This commit adjusts the handling of follower reads to redirect to the
leaseholder immediately if a conflicting intent is observed while
reading. This replaces the previous behavior of attempting to resolve
the intents from the follower using an inefficient method (i.e. without
batching and with multiple follower<->leaseholder hops) and then
re-evaluating after the resolution had completed.

In general, waiting for conflicting intents on the leaseholder instead
of on a follower is preferable because:
- the leaseholder is notified of and reactive to lock-table state
  transitions.
- the leaseholder is able to more efficiently resolve intents, if
  necessary, without the risk of multiple follower<->leaseholder
  round-trips compounding. If the follower was to attempt to resolve
  multiple intents during a follower read then the PushTxn and
  ResolveIntent requests would quickly be more expensive (in terms of
  latency) than simply redirecting the entire read request to the
  leaseholder and letting the leaseholder coordinate the intent
  resolution.
- after the leaseholder has received a response from a ResolveIntent
  request, it has a guarantee that the intent resolution has been applied
  locally and that no future read will observe the intent. This is not
  true on follower replicas. Due to the asynchronous nature of Raft, both
  due to quorum voting and due to async commit acknowledgement from
  leaders to followers, it is possible for a ResolveIntent request to
  complete and then for a future read on a follower to observe the
  pre-resolution state of the intent. This effect is transient and will
  eventually disappear once the follower catches up on its Raft log, but
  it creates an opportunity for momentary thrashing if a follower read
  was to resolve an intent and then immediately attempt to read again.

This behavior of redirecting follower read attempts to the leaseholder
replica if they encounter conflicting intents on a follower means that
follower read eligibility is a function of the "resolved timestamp" over
a read's key span, and not just the "closed timestamp" over its key
span. Architecturally, this is consistent with Google Spanner, who
maintains a concept of "safe time", "paxos safe time", "transaction
manager safe time". "safe time" is analogous to the "resolved timestamp"
in CockroachDB and "paxos safe time" is analogous to the "closed
timestamp" in CockroachDB. In Spanner, it is the "safe time" of a
replica that determines follower read eligibility.

There are some downsides to this change which I think are interesting to
point out, but I don't think are meaningfully concerning:
1. we don't detect the difference between the resolved timestamp and the
   closed timestamp until after we have begun evaluating the follower
   read and scanning MVCC data. This lazy detection of follower read
   eligibility can lead to wasted work. In the future, we may consider
   making this detection eager once we address #69717.
2. redirecting follower reads to leaseholders can lead to large response
   payloads being shipped over wide-area network links. So far, this PR has
   compared the latency of multiple WAN hops for intent resolution to a
   single WAN hop for read redirection, but that doesn't recognize the
   potential asymmetry in cost, at least at the extreme, between
   control-plane requests like `PushTxn` and `ResolveIntent` and data-plane
   requests like `Scan` and `Get`. In the future, I'd like to recognize
   this asymmetry explore ideas around never redirecting the data-plane
   portion of follower reads to leaseholders and instead only ever sending
   control-plane requests to proactively close time and relay log positions
   back to the followers. This is similar to what Spanner does, see
   https://www.cockroachlabs.com/blog/follower-reads-stale-data/#comparing-cockroachdb-with-spanner.
   For now, though, I don't think redirecting marginally more often is
   concerning.

Release note (performance improvement): follower reads that encounter many
abandoned intents are now able to efficiently resolve those intents. This
resolves an asymmetry where follower reads were previously less efficient at
resolving abandoned intents than regular reads evaluated on a leaseholder.
  • Loading branch information
nvanbenschoten committed Sep 17, 2021
1 parent 7d5dea0 commit 745c4ee
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 316 deletions.
102 changes: 69 additions & 33 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
}

// TestClosedTimestampCanServeWithConflictingIntent validates that a read served
// from a follower replica will wait on conflicting intents and ensure that they
// are cleaned up if necessary to allow the read to proceed.
func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
// TestClosedTimestampCantServeWithConflictingIntent validates that a read
// served from a follower replica will redirect to the leaseholder if it
// encounters a conflicting intent below the closed timestamp.
func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -261,8 +261,8 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)

// Write N different intents for the same transaction, where N is the number
// of replicas in the testing range. Each intent will be read and eventually
// resolved by a read on a different replica.
// of replicas in the testing range. Each intent will be read on a different
// replica.
txnKey := desc.StartKey.AsRawKey()
txnKey = txnKey[:len(txnKey):len(txnKey)] // avoid aliasing
txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0)
Expand All @@ -272,53 +272,89 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
keys = append(keys, key)
put := putArgs(key, []byte("val"))
resp, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, put)
if err != nil {
t.Fatal(err)
}
require.Nil(t, err)
txn.Update(resp.Header().Txn)
}

// Read a different intent on each replica. All should begin waiting on the
// intents by pushing the transaction that wrote them. None should complete.
ts := txn.WriteTimestamp
respCh := make(chan error, len(keys))
for i, key := range keys {
go func(repl *kvserver.Replica, key roachpb.Key) {
baRead := makeTxnReadBatchForDesc(desc, ts)
respCh <- testutils.SucceedsSoonError(func() error {
// Expect 0 rows, because the intents will be aborted.
_, err := expectRows(0)(repl.Send(ctx, baRead))
return err
})
}(repls[i], key)
// Set a long txn liveness threshold so that the txn cannot be aborted.
defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)()

// runFollowerReads attempts to perform a follower read on a different key on
// each replica, using the provided timestamp as the request timestamp.
runFollowerReads := func(ts hlc.Timestamp, retryUntilSuccessful bool) chan error {
respCh := make(chan error, len(repls))
for i := range repls {
go func(repl *kvserver.Replica, key roachpb.Key) {
baRead := makeTxnReadBatchForDesc(desc, ts)
baRead.Requests[0].GetScan().SetSpan(roachpb.Span{
Key: key,
EndKey: key.Next(),
})
var err error
if retryUntilSuccessful {
err = testutils.SucceedsSoonError(func() error {
// Expect 0 rows, because the intents are never committed.
_, err := expectRows(0)(repl.Send(ctx, baRead))
return err
})
} else {
_, pErr := repl.Send(ctx, baRead)
err = pErr.GoError()
}
respCh <- err
}(repls[i], keys[i])
}
return respCh
}

// Follower reads should be possible up to just below the intents' timestamp.
// We use MinTimestamp instead of WriteTimestamp because the WriteTimestamp
// may have been bumped after the txn wrote some intents.
respCh1 := runFollowerReads(txn.MinTimestamp.Prev(), true)
for i := 0; i < len(repls); i++ {
require.NoError(t, <-respCh1)
}

// At the intents' timestamp, reads on the leaseholder should block and reads
// on the followers should be redirected to the leaseholder, even though the
// read timestamp is below the closed timestamp.
respCh2 := runFollowerReads(txn.WriteTimestamp, false)
for i := 0; i < len(repls)-1; i++ {
err := <-respCh2
require.Error(t, err)
var lErr *roachpb.NotLeaseHolderError
require.True(t, errors.As(err, &lErr))
}
select {
case err := <-respCh:
case err := <-respCh2:
t.Fatalf("request unexpectedly returned, should block; err: %v", err)
case <-time.After(20 * time.Millisecond):
}

// Abort the transaction. All pushes should succeed and all intents should
// be resolved, allowing all reads (on the leaseholder and on followers) to
// proceed and finish.
// Abort the transaction. All intents should be rolled back.
endTxn := &roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
Commit: false,
LockSpans: []roachpb.Span{desc.KeySpan().AsRawSpanWithNoLocals()},
}
if _, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, endTxn); err != nil {
t.Fatal(err)
}
for range keys {
require.NoError(t, <-respCh)
_, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, endTxn)
require.Nil(t, err)

// The blocked read on the leaseholder should succeed.
require.NoError(t, <-respCh2)

// Follower reads should now be possible at the intents' timestamp.
respCh3 := runFollowerReads(txn.WriteTimestamp, true)
for i := 0; i < len(repls); i++ {
require.NoError(t, <-respCh3)
}
}

// TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that
// if a timestamp is safe for reading on both the left side and right side of a
// a merge then it will be safe after the merge and that if a timestamp is safe
// merge then it will be safe after the merge and that if a timestamp is safe
// for reading before the beginning of a split it will be safe on both sides of
// of the split.
// the split.
func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
16 changes: 0 additions & 16 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,22 +756,6 @@ type lockTableWaiter interface {
// again.
WaitOn(context.Context, Request, lockTableGuard) *Error

// WaitOnLock waits on the transaction responsible for the specified lock
// and then ensures that the lock is cleared out of the request's way.
//
// The method should be called after dropping any latches that a request has
// acquired. It returns when the lock has been resolved.
//
// NOTE: this method is used when the lockTable is disabled (e.g. on a
// follower replica) and a lock is discovered that must be waited on (e.g.
// during a follower read). If/when lockTables are maintained on follower
// replicas by propagating lockTable state transitions through the Raft log
// in the ReplicatedEvalResult instead of through the (leaseholder-only)
// LocalResult, we should be able to remove the lockTable "disabled" state
// and, in turn, remove this method. This will likely fall out of pulling
// all replicated locks into the lockTable.
WaitOnLock(context.Context, Request, *roachpb.Intent) *Error

// ResolveDeferredIntents resolves the batch of intents if the provided
// error is nil. The batch of intents may be resolved more efficiently than
// if they were resolved individually.
Expand Down
49 changes: 28 additions & 21 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,19 +416,37 @@ func (m *managerImpl) HandleWriterIntentError(
}

// Add a discovered lock to lock-table for each intent and enter each lock's
// wait-queue. If the lock-table is disabled and one or more of the intents
// are ignored then we immediately wait on all intents.
// wait-queue.
//
// If the lock-table is disabled and one or more of the intents are ignored
// then we proceed without the intent being added to the lock table. In such
// cases, we know that this replica is no longer the leaseholder. One of two
// things can happen next.
// 1) if the request cannot be served on this follower replica according to
// the closed timestamp then it will be redirected to the leaseholder on
// its next evaluation attempt, where it may discover the same intent and
// wait in the new leaseholder's lock table.
// 2) if the request can be served on this follower replica according to the
// closed timestamp then it will likely re-encounter the same intent on its
// next evaluation attempt. The WriteIntentError will then be mapped to an
// InvalidLeaseError in maybeAttachLease, which will indicate that the
// request cannot be served as a follower read after all and cause the
// request to be redirected to the leaseholder.
//
// Either way, there is no possibility of the request entering an infinite
// loop without making progress.
consultFinalizedTxnCache :=
int64(len(t.Intents)) > DiscoveredLocksThresholdToConsultFinalizedTxnCache.Get(&m.st.SV)
wait := false
for i := range t.Intents {
intent := &t.Intents[i]
added, err := m.lt.AddDiscoveredLock(intent, seq, consultFinalizedTxnCache, g.ltg)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
if !added {
wait = true
log.VEventf(ctx, 2,
"intent on %s discovered but not added to disabled lock table",
intent.Key.String())
}
}

Expand All @@ -438,23 +456,12 @@ func (m *managerImpl) HandleWriterIntentError(
// Guard. This is analogous to iterating through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())

// If the lockTable was disabled then we need to immediately wait on the
// intents to ensure that they are resolved and moved out of the request's
// way.
if wait {
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.ltw.WaitOnLock(ctx, g.Req, intent); err != nil {
m.FinishReq(g)
return nil, err
}
}
} else {
if toResolve := g.ltg.ResolveBeforeScanning(); len(toResolve) > 0 {
if err := m.ltw.ResolveDeferredIntents(ctx, toResolve); err != nil {
m.FinishReq(g)
return nil, err
}
// If the discovery process collected a set of intents to resolve before the
// next evaluation attempt, do so.
if toResolve := g.ltg.ResolveBeforeScanning(); len(toResolve) > 0 {
if err := m.ltw.ResolveDeferredIntents(ctx, toResolve); err != nil {
m.FinishReq(g)
return nil, err
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,9 +2330,8 @@ func (t *lockTableImpl) AddDiscoveredLock(
}
if seq < t.enabledSeq {
// If the lease sequence is too low, this discovered lock may no longer
// be accurate, so we ignore it. However, we still return true so that
// the request immediately retries, this time under a newer lease.
return true, nil
// be accurate, so we ignore it.
return false, nil
} else if seq > t.enabledSeq {
// The enableSeq is set synchronously with the application of a new
// lease, so it should not be possible for a request to evaluate at a
Expand Down
25 changes: 0 additions & 25 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,31 +435,6 @@ func (w *lockTableWaiterImpl) WaitOn(
}
}

// WaitOnLock implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) WaitOnLock(
ctx context.Context, req Request, intent *roachpb.Intent,
) *Error {
sa, _, err := findAccessInSpans(intent.Key, req.LockSpans)
if err != nil {
return roachpb.NewError(err)
}
state := waitingState{
kind: waitFor,
txn: &intent.Txn,
key: intent.Key,
held: true,
guardAccess: sa,
}
if req.LockTimeout != 0 {
return doWithTimeoutAndFallback(
ctx, req.LockTimeout,
func(ctx context.Context) *Error { return w.pushLockTxn(ctx, req, state) },
func(ctx context.Context) *Error { return w.pushLockTxnAfterTimeout(ctx, req, state) },
)
}
return w.pushLockTxn(ctx, req, state)
}

// pushLockTxn pushes the holder of the provided lock.
//
// If a Block wait policy is set on the request, method blocks until the lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ sequence req=req4
handle-write-intent-error req=req2 lease-seq=1
intent txn=txn1 key=k
----
[6] handle write intent error req2: intent on "k" discovered but not added to disabled lock table
[6] handle write intent error req2: handled conflicting intents on "k", released latches

debug-lock-table
Expand Down
Loading

0 comments on commit 745c4ee

Please sign in to comment.