Skip to content

Commit

Permalink
kv: immediately push on WriteIntentError when lock-table disabled
Browse files Browse the repository at this point in the history
Fixes #46148.

This commit fixes a bug where follower reads that hit intents could get
stuck in an indefinite loop of running into the intent during evaluation,
not adding the intent to the lock-table because the lock table was
disabled, sequencing in the concurrency manager without issue, and
repeating. The new TestClosedTimestampCanServeWithConflictingIntent test
hits exactly this issue before this commit.

The fix implemented here is to immediately push the transaction
responsible for an intent when serving a follower read (i.e. when a
replica's lock-table is disabled). This ensures that the intent gets
cleaned up if it was abandoned and avoids the busy loop we see today.
If/when lockTables are maintained on follower replicas by propagating
lockTable state transitions through the Raft log in the
ReplicatedEvalResult instead of through the (leaseholder-only)
LocalResult, we should be able to remove the lockTable "disabled" state
and, in turn, remove this special-case.

The alternative approach floated to address this was to simply pass a
NotLeaseHolderError back to the client when an intent is hit on a
follower. This would have worked to avoid the infinite loop, but it
seems like a short-term patch that doesn't get to the root of the issue.
As we push further on follower reads (or even consistent read replicas),
we want non-leaseholders to be able to perform conflict resolution.
Falling back to the leaseholder works counter to this goal. The approach
implemented by this commit works towards this goal, simply falling back
to the previous sub-optimal approach of pushing immediately during
conflicts.

Release note (bug fix): Follower reads that hit intents no longer have
a chance of entering an infinite loop. This bug was present in earlier
versions of the v20.1 release.

Release justification: fixes a high-priority bug where follower reads
could get stuck indefinitely if they hit an abandoned intent.
  • Loading branch information
nvanbenschoten committed Mar 18, 2020
1 parent 2dbc433 commit c57d6d0
Show file tree
Hide file tree
Showing 11 changed files with 488 additions and 120 deletions.
76 changes: 76 additions & 0 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -172,6 +175,79 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
}

// TestClosedTimestampCanServeWithConflictingIntent validates that a read served
// from a follower replica will wait on conflicting intents and ensure that they
// are cleaned up if necessary to allow the read to proceed.
func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)

// Write N different intents for the same transaction, where N is the number
// of replicas in the testing range. Each intent will be read and eventually
// resolved by a read on a different replica.
txnKey := desc.StartKey.AsRawKey()
txnKey = txnKey[:len(txnKey):len(txnKey)] // avoid aliasing
txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0)
var keys []roachpb.Key
for i := range repls {
key := append(txnKey, []byte(strconv.Itoa(i))...)
keys = append(keys, key)
put := putArgs(key, []byte("val"))
resp, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, put)
if err != nil {
t.Fatal(err)
}
txn.Update(resp.Header().Txn)
}

// Read a different intent on each replica. All should begin waiting on the
// intents by pushing the transaction that wrote them. None should complete.
ts := txn.WriteTimestamp
respCh := make(chan struct{}, len(keys))
for i, key := range keys {
go func(repl *kvserver.Replica, key roachpb.Key) {
var baRead roachpb.BatchRequest
r := &roachpb.ScanRequest{}
r.Key = key
r.EndKey = key.Next()
baRead.Add(r)
baRead.Timestamp = ts
baRead.RangeID = desc.RangeID

testutils.SucceedsSoon(t, func() error {
// Expect 0 rows, because the intents will be aborted.
_, err := expectRows(0)(repl.Send(ctx, baRead))
return err
})
respCh <- struct{}{}
}(repls[i], key)
}

select {
case <-respCh:
t.Fatal("request unexpectedly succeeded, should block")
case <-time.After(20 * time.Millisecond):
}

// Abort the transaction. All pushes should succeed and all intents should
// be resolved, allowing all reads (on the leaseholder and on followers) to
// proceed and finish.
endTxn := &roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
Commit: false,
}
if _, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, endTxn); err != nil {
t.Fatal(err)
}
for range keys {
<-respCh
}
}

// TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that
// if a timestamp is safe for reading on both the left side and right side of a
// a merge then it will be safe after the merge and that if a timestamp is safe
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ type ContentionHandler interface {
// error in the lock's wait-queue (but does not wait) and releases the
// guard's latches. It returns an updated guard reflecting this change.
// After the method returns, the original guard should no longer be used.
// If an error is returned then the provided guard will be released and no
// guard will be returned.
//
// Example usage: Txn A scans the lock table and does not see an intent on
// key K from txn B because the intent is not being tracked in the lock
Expand All @@ -204,7 +206,7 @@ type ContentionHandler interface {
// method before txn A retries its scan. During the retry, txn A scans the
// lock table and observes the lock on key K, so it enters the lock's
// wait-queue and waits for it to be resolved.
HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) *Guard
HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) (*Guard, *Error)

// HandleTransactionPushError consumes a TransactionPushError thrown by a
// PushTxnRequest by informing the concurrency manager about a transaction
Expand Down Expand Up @@ -474,7 +476,11 @@ type lockTable interface {
//
// A latch consistent with the access desired by the guard must be held on
// the span containing the discovered lock's key.
AddDiscoveredLock(*roachpb.Intent, lockTableGuard) error
//
// The method returns a boolean indicating whether the discovered lock was
// added to the lockTable (true) or whether it was ignored because the
// lockTable is currently disabled (false).
AddDiscoveredLock(*roachpb.Intent, lockTableGuard) (bool, error)

// AcquireLock informs the lockTable that a new lock was acquired or an
// existing lock was updated.
Expand Down Expand Up @@ -610,6 +616,22 @@ type lockTableWaiter interface {
// wait-queues and it is safe to re-acquire latches and scan the lockTable
// again.
WaitOn(context.Context, Request, lockTableGuard) *Error

// WaitOnLock waits on the transaction responsible for the specified lock
// and then ensures that the lock is cleared out of the request's way.
//
// The method should be called after dropping any latches that a request has
// acquired. It returns when the lock has been resolved.
//
// NOTE: this method is used when the lockTable is disabled (e.g. on a
// follower replica) and a lock is discovered that must be waited on (e.g.
// during a follower read). If/when lockTables are maintained on follower
// replicas by propagating lockTable state transitions through the Raft log
// in the ReplicatedEvalResult instead of through the (leaseholder-only)
// LocalResult, we should be able to remove the lockTable "disabled" state
// and, in turn, remove this method. This will likely fall out of pulling
// all replicated locks into the lockTable.
WaitOnLock(context.Context, Request, *roachpb.Intent) *Error
}

// txnWaitQueue holds a collection of wait-queues for transaction records.
Expand Down
28 changes: 24 additions & 4 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,27 +244,47 @@ func (m *managerImpl) FinishReq(g *Guard) {
// HandleWriterIntentError implements the ContentionHandler interface.
func (m *managerImpl) HandleWriterIntentError(
ctx context.Context, g *Guard, t *roachpb.WriteIntentError,
) *Guard {
) (*Guard, *Error) {
if g.ltg == nil {
log.Fatalf(ctx, "cannot handle WriteIntentError %v for request without "+
"lockTableGuard; were lock spans declared for this request?", t)
}

// Add a discovered lock to lock-table for each intent and enter each lock's
// wait-queue.
// wait-queue. If the lock-table is disabled and one or more of the intents
// are ignored then we immediately wait on all intents.
wait := false
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.lt.AddDiscoveredLock(intent, g.ltg); err != nil {
added, err := m.lt.AddDiscoveredLock(intent, g.ltg)
if err != nil {
log.Fatal(ctx, errors.HandleAsAssertionFailure(err))
}
if !added {
wait = true
}
}

// Release the Guard's latches but continue to remain in lock wait-queues by
// not releasing lockWaitQueueGuards. We expect the caller of this method to
// then re-sequence the Request by calling SequenceReq with the un-latched
// Guard. This is analogous to iterating through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())
return g

// If the lockTable was disabled then we need to immediately wait on the
// intents to ensure that they are resolved and moved out of the request's
// way.
if wait {
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.ltw.WaitOnLock(ctx, g.Req, intent); err != nil {
m.FinishReq(g)
return nil, err
}
}
}

return g, nil
}

// HandleTransactionPushError implements the ContentionHandler interface.
Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
case "handle-write-intent-error":
var reqName string
d.ScanArgs(t, "req", &reqName)
guard, ok := c.guardsByReqName[reqName]
prev, ok := c.guardsByReqName[reqName]
if !ok {
d.Fatalf(t, "unknown request: %s", reqName)
}
Expand All @@ -237,12 +237,22 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.ScanArgs(t, "key", &key)

opName := fmt.Sprintf("handle write intent error %s", reqName)
mon.runSync(opName, func(ctx context.Context) {
err := &roachpb.WriteIntentError{Intents: []roachpb.Intent{
mon.runAsync(opName, func(ctx context.Context) {
wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{
roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)),
}}
log.Eventf(ctx, "handling %v", err)
guard = m.HandleWriterIntentError(ctx, guard, err)
guard, err := m.HandleWriterIntentError(ctx, prev, wiErr)
if err != nil {
log.Eventf(ctx, "handled %v, returned error: %v", wiErr, err)
c.mu.Lock()
delete(c.guardsByReqName, reqName)
c.mu.Unlock()
} else {
log.Eventf(ctx, "handled %v, released latches", wiErr)
c.mu.Lock()
c.guardsByReqName[reqName] = guard
c.mu.Unlock()
}
})
return c.waitAndCollect(t, mon)

Expand Down
32 changes: 17 additions & 15 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,22 +1729,20 @@ func (t *lockTableImpl) Dequeue(guard lockTableGuard) {
}

// AddDiscoveredLock implements the lockTable interface.
func (t *lockTableImpl) AddDiscoveredLock(intent *roachpb.Intent, guard lockTableGuard) error {
func (t *lockTableImpl) AddDiscoveredLock(
intent *roachpb.Intent, guard lockTableGuard,
) (added bool, _ error) {
t.enabledMu.RLock()
defer t.enabledMu.RUnlock()
if !t.enabled {
// If not enabled, don't track any locks.
return nil
return false, nil
}
g := guard.(*lockTableGuardImpl)
key := intent.Key
ss := spanset.SpanGlobal
if keys.IsLocal(key) {
ss = spanset.SpanLocal
}
sa, err := findAccessInSpans(key, ss, g.spans)
sa, ss, err := findAccessInSpans(key, g.spans)
if err != nil {
return err
return false, err
}
var l *lockState
tree := &t.locks[ss]
Expand All @@ -1763,7 +1761,7 @@ func (t *lockTableImpl) AddDiscoveredLock(intent *roachpb.Intent, guard lockTabl
} else {
l = iter.Cur()
}
return l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, sa)
return true, l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, sa)
}

// AcquireLock implements the lockTable interface.
Expand Down Expand Up @@ -1856,11 +1854,15 @@ func (t *lockTableImpl) tryClearLocks(force bool) {
}
}

// Given the key with scope ss must be in spans, returns the strongest access
// specified in the spans.
// Given the key must be in spans, returns the strongest access
// specified in the spans, along with the scope of the key.
func findAccessInSpans(
key roachpb.Key, ss spanset.SpanScope, spans *spanset.SpanSet,
) (spanset.SpanAccess, error) {
key roachpb.Key, spans *spanset.SpanSet,
) (spanset.SpanAccess, spanset.SpanScope, error) {
ss := spanset.SpanGlobal
if keys.IsLocal(key) {
ss = spanset.SpanLocal
}
for sa := spanset.NumSpanAccess - 1; sa >= 0; sa-- {
s := spans.GetSpans(sa, ss)
// First span that starts after key
Expand All @@ -1869,10 +1871,10 @@ func findAccessInSpans(
})
if i > 0 &&
((len(s[i-1].EndKey) > 0 && key.Compare(s[i-1].EndKey) < 0) || key.Equal(s[i-1].Key)) {
return sa, nil
return sa, ss, nil
}
}
return spanset.NumSpanAccess, errors.Errorf("caller violated contract")
return 0, 0, errors.Errorf("caller violated contract")
}

// Tries to GC locks that were previously known to have become empty.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestLockTableBasic(t *testing.T) {
d.Fatalf(t, "unknown txn %s", txnName)
}
intent := roachpb.MakeIntent(txnMeta, roachpb.Key(key))
if err := lt.AddDiscoveredLock(&intent, g); err != nil {
if _, err := lt.AddDiscoveredLock(&intent, g); err != nil {
return err.Error()
}
return lt.(*lockTableImpl).String()
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -298,6 +299,26 @@ func (w *lockTableWaiterImpl) WaitOn(
}
}

// WaitOnLock implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) WaitOnLock(
ctx context.Context, req Request, intent *roachpb.Intent,
) *Error {
sa, _, err := findAccessInSpans(intent.Key, req.LockSpans)
if err != nil {
return roachpb.NewError(err)
}
return w.pushLockTxn(ctx, req, waitingState{
stateKind: waitFor,
txn: &intent.Txn,
ts: intent.Txn.WriteTimestamp,
dur: lock.Replicated,
key: intent.Key,
held: true,
access: spanset.SpanReadWrite,
guardAccess: sa,
})
}

// pushLockTxn pushes the holder of the provided lock.
//
// The method blocks until the lock holder transaction experiences a state
Expand Down
Loading

0 comments on commit c57d6d0

Please sign in to comment.