From 30037f1c1ae43a1e9ece698ae5816f8e39462d75 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 10 Nov 2020 21:50:11 -0500 Subject: [PATCH] kvserver: support child transactions in the lock table This commit adds support for child transactions to the lockTable and deals with unexpected WriteIntentErrors due to write-write conflicts between the parent and child transaction. Release note: None --- .../concurrency/concurrency_manager.go | 29 ++++-- pkg/kv/kvserver/concurrency/lock_table.go | 64 +++++++++++-- .../kvserver/concurrency/lock_table_test.go | 81 ++++++++++++----- .../concurrency/testdata/lock_table/child_txn | 90 +++++++++++++++++++ 4 files changed, 229 insertions(+), 35 deletions(-) create mode 100644 pkg/kv/kvserver/concurrency/testdata/lock_table/child_txn diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 2e5e53a8d5d4..59ba7d1bfebc 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -22,12 +22,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) // managerImpl implements the Manager interface. @@ -256,12 +256,23 @@ func (m *managerImpl) HandleWriterIntentError( // wait-queue. If the lock-table is disabled and one or more of the intents // are ignored then we immediately wait on all intents. wait := false + // While iterating the intents, detect if any is due to an ancestor + // transaction which is illegal and must be detected. + var parentIntentError error for i := range t.Intents { intent := &t.Intents[i] added, err := m.lt.AddDiscoveredLock(intent, seq, g.ltg) if err != nil { log.Fatalf(ctx, "%v", err) } + for cur := g.Req.Txn; cur != nil && parentIntentError == nil; cur = cur.Parent { + if cur.ID == intent.Txn.ID { + parentIntentError = errors.AssertionFailedf( + "child transaction %s attempted to write over parent %s at key %s", + g.Req.Txn.Short(), cur.Short(), intent.Key) + break + } + } if !added { wait = true } @@ -273,6 +284,15 @@ func (m *managerImpl) HandleWriterIntentError( // Guard. This is analogous to iterating through the loop in SequenceReq. m.lm.Release(g.moveLatchGuard()) + // Detect if a discovered write intent was due to an ancestor transaction. + // Such a conflict is illegal but needs to be surfaced to be detected. + // + // TODO(ajwerner): Consider having this return a specific roachpb.Error that + // will be converted to an assertion failure on the client. + if parentIntentError != nil { + return nil, roachpb.NewError(parentIntentError) + } + // If the lockTable was disabled then we need to immediately wait on the // intents to ensure that they are resolved and moved out of the request's // way. @@ -403,13 +423,6 @@ func (m *managerImpl) TxnWaitQueue() *txnwait.Queue { return m.twq.(*txnwait.Queue) } -func (r *Request) txnMeta() *enginepb.TxnMeta { - if r.Txn == nil { - return nil - } - return &r.Txn.TxnMeta -} - // readConflictTimestamp returns the maximum timestamp at which the request // conflicts with locks acquired by other transaction. The request must wait // for all locks acquired by other transactions at or below this timestamp diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 554268f83d53..e6f10c76dd08 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -258,7 +258,7 @@ type lockTableGuardImpl struct { seqNum uint64 // Information about this request. - txn *enginepb.TxnMeta + txn *roachpb.Transaction spans *spanset.SpanSet readTS hlc.Timestamp writeTS hlc.Timestamp @@ -421,8 +421,51 @@ func (g *lockTableGuardImpl) doneWaitingAtLock(hasReservation bool, l *lockState g.mu.Unlock() } +// isSameTxn returns true if txn corresponds to the txn in g or to any of its +// parents. We allow requests due to children to pass through the lock table +// without observing any locks held by the parent. This is seemingly hazardous +// in the scenario that the child is attempting to write (acquire a replicated +// lock) or to acquire an unreplicated lock. However, each of the cases ends up +// working out either due to logic at a different level or the guarantees of the +// the system. +// +// case | parent | child +// ------+--------+------- +// (1) | r | r +// (2) | r | u +// (3) | u | r +// (4) | u | u +// +// (1) Parent holds a replicated exclusive lock and the child is trying to +// write. This case is handled below the lock table by detecting the +// WriteIntentError for a parent and converting it to an assertion failure. +// Child transactions should *never* write over any of their ancestor's +// intents. +// +// (2) Parent holds a replicated exclusive lock and the child is trying to +// acquire an unreplicated read lock. In this case we'll end up no-oping +// because we'll believe that there already is a lock. This case is okay. +// +// (3) Parent holds an unreplicated lock and the child writes. When the child +// commits, it will invalidate the parent's read. Children should not be +// writing over the parent's read set. Child transactions push their +// parents above their commit timestamp and the timestamp cache will push +// the child above the parent's read timestamp so the parent is doomed +// to fail. That being said, this is safe from a correctness perspective. +// A more informative error might be better but it's hard to plumb that +// through. +// +// (4) Parent holds an unreplicated lock and the child would like to acquire +// one. This case ends up just working as we'll pass through the parent's +// lock and not acquire anything. +// func (g *lockTableGuardImpl) isSameTxn(txn *enginepb.TxnMeta) bool { - return g.txn != nil && g.txn.ID == txn.ID + for cur := g.txn; cur != nil; cur = cur.Parent { + if cur.ID == txn.ID { + return true + } + } + return false } func (g *lockTableGuardImpl) isSameTxnAsReservation(ws waitingState) bool { @@ -483,6 +526,13 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) { } } +func (g *lockTableGuardImpl) txnMeta() *enginepb.TxnMeta { + if g.txn == nil { + return nil + } + return &g.txn.TxnMeta +} + // Waiting writers in a lockState are wrapped in a queuedGuard. A waiting // writer is typically waiting in an active state, i.e., the // lockTableGuardImpl.key refers to this lockState. However, breaking of @@ -784,7 +834,7 @@ func (l *lockState) Format(buf *strings.Builder) { txn, ts := l.getLockHolder() if txn == nil { fmt.Fprintf(buf, " res: req: %d, ", l.reservation.seqNum) - writeResInfo(buf, l.reservation.txn, l.reservation.writeTS) + writeResInfo(buf, l.reservation.txnMeta(), l.reservation.writeTS) } else { writeHolderInfo(buf, txn, ts) } @@ -849,7 +899,7 @@ func (l *lockState) informActiveWaiters() { waitForState.txn = lockHolderTxn waitForState.held = true } else { - waitForState.txn = l.reservation.txn + waitForState.txn = l.reservation.txnMeta() if !findDistinguished && l.distinguishedWaiter.isSameTxnAsReservation(waitForState) { findDistinguished = true l.distinguishedWaiter = nil @@ -938,7 +988,7 @@ func (l *lockState) tryMakeNewDistinguished() { } else if l.queuedWriters.Len() > 0 { for e := l.queuedWriters.Front(); e != nil; e = e.Next() { qg := e.Value.(*queuedGuard) - if qg.active && (l.reservation == nil || !qg.guard.isSameTxn(l.reservation.txn)) { + if qg.active && (l.reservation == nil || !qg.guard.isSameTxn(l.reservation.txnMeta())) { g = qg.guard break } @@ -1095,7 +1145,7 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, // non-transactional request. Ignore the reservation. return false } - waitForState.txn = l.reservation.txn + waitForState.txn = l.reservation.txnMeta() } // Incompatible with whoever is holding lock or reservation. @@ -1779,7 +1829,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa if guard == nil { g = newLockTableGuardImpl() g.seqNum = atomic.AddUint64(&t.seqNum, 1) - g.txn = req.txnMeta() + g.txn = req.Txn g.spans = req.LockSpans g.readTS = req.readConflictTimestamp() g.writeTS = req.writeConflictTimestamp() diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 87757d2efa59..a6c3ba0dd5d4 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -132,7 +132,7 @@ func TestLockTableBasic(t *testing.T) { datadriven.Walk(t, "testdata/lock_table", func(t *testing.T, path string) { var lt lockTable - var txnsByName map[string]*enginepb.TxnMeta + var txnsByName map[string]*roachpb.Transaction var txnCounter uint128.Uint128 var requestsByName map[string]Request var guardsByReqName map[string]lockTableGuard @@ -146,11 +146,45 @@ func TestLockTableBasic(t *testing.T) { enabledSeq: 1, maxLocks: int64(maxLocks), } - txnsByName = make(map[string]*enginepb.TxnMeta) + txnsByName = make(map[string]*roachpb.Transaction) txnCounter = uint128.FromInts(0, 0) requestsByName = make(map[string]Request) guardsByReqName = make(map[string]lockTableGuard) return "" + case "new-child-txn": + var parentName string + d.ScanArgs(t, "parent", &parentName) + parent, ok := txnsByName[parentName] + if !ok { + d.Fatalf(t, "unknown parent transaction %q", parentName) + } + var txnName string + d.ScanArgs(t, "txn", &txnName) + ts := scanTimestamp(t, d) + var epoch int + d.ScanArgs(t, "epoch", &epoch) + var seq int + if d.HasArg("seq") { + d.ScanArgs(t, "seq", &seq) + } + txnMeta, ok := txnsByName[txnName] + var id uuid.UUID + if ok { + id = txnMeta.ID + } else { + id = nextUUID(&txnCounter) + } + txnsByName[txnName] = &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: id, + Epoch: enginepb.TxnEpoch(epoch), + Sequence: enginepb.TxnSeq(seq), + WriteTimestamp: ts, + }, + ReadTimestamp: ts, + Parent: parent.Clone(), + } + return "" case "new-txn": // UUIDs for transactions are numbered from 1 by this test code and @@ -173,11 +207,14 @@ func TestLockTableBasic(t *testing.T) { } else { id = nextUUID(&txnCounter) } - txnsByName[txnName] = &enginepb.TxnMeta{ - ID: id, - Epoch: enginepb.TxnEpoch(epoch), - Sequence: enginepb.TxnSeq(seq), - WriteTimestamp: ts, + txnsByName[txnName] = &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: id, + Epoch: enginepb.TxnEpoch(epoch), + Sequence: enginepb.TxnSeq(seq), + WriteTimestamp: ts, + }, + ReadTimestamp: ts, } return "" @@ -208,10 +245,8 @@ func TestLockTableBasic(t *testing.T) { // Update the transaction's timestamp, if necessary. The transaction // may have needed to move its timestamp for any number of reasons. txnMeta.WriteTimestamp = ts - req.Txn = &roachpb.Transaction{ - TxnMeta: *txnMeta, - ReadTimestamp: ts, - } + req.Txn = txnMeta.Clone() + req.Txn.ReadTimestamp = ts } requestsByName[reqName] = req return "" @@ -254,7 +289,7 @@ func TestLockTableBasic(t *testing.T) { case "release": var txnName string d.ScanArgs(t, "txn", &txnName) - txnMeta, ok := txnsByName[txnName] + txn, ok := txnsByName[txnName] if !ok { d.Fatalf(t, "unknown txn %s", txnName) } @@ -262,7 +297,7 @@ func TestLockTableBasic(t *testing.T) { d.ScanArgs(t, "span", &s) span := getSpan(t, d, s) // TODO(sbhola): also test ABORTED. - intent := &roachpb.LockUpdate{Span: span, Txn: *txnMeta, Status: roachpb.COMMITTED} + intent := &roachpb.LockUpdate{Span: span, Txn: txn.TxnMeta, Status: roachpb.COMMITTED} if err := lt.UpdateLocks(intent); err != nil { return err.Error() } @@ -271,17 +306,23 @@ func TestLockTableBasic(t *testing.T) { case "update": var txnName string d.ScanArgs(t, "txn", &txnName) - txnMeta, ok := txnsByName[txnName] + txn, ok := txnsByName[txnName] if !ok { d.Fatalf(t, "unknown txn %s", txnName) } ts := scanTimestamp(t, d) var epoch int d.ScanArgs(t, "epoch", &epoch) - txnMeta = &enginepb.TxnMeta{ID: txnMeta.ID, Sequence: txnMeta.Sequence} - txnMeta.Epoch = enginepb.TxnEpoch(epoch) - txnMeta.WriteTimestamp = ts - txnsByName[txnName] = txnMeta + txn = &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: txn.ID, + Sequence: txn.Sequence, + WriteTimestamp: ts, + Epoch: enginepb.TxnEpoch(epoch), + }, + ReadTimestamp: ts, + } + txnsByName[txnName] = txn var s string d.ScanArgs(t, "span", &s) span := getSpan(t, d, s) @@ -313,7 +354,7 @@ func TestLockTableBasic(t *testing.T) { } // TODO(sbhola): also test STAGING. intent := &roachpb.LockUpdate{ - Span: span, Txn: *txnMeta, Status: roachpb.PENDING, IgnoredSeqNums: ignored} + Span: span, Txn: txn.TxnMeta, Status: roachpb.PENDING, IgnoredSeqNums: ignored} if err := lt.UpdateLocks(intent); err != nil { return err.Error() } @@ -334,7 +375,7 @@ func TestLockTableBasic(t *testing.T) { if !ok { d.Fatalf(t, "unknown txn %s", txnName) } - intent := roachpb.MakeIntent(txnMeta, roachpb.Key(key)) + intent := roachpb.MakeIntent(&txnMeta.TxnMeta, roachpb.Key(key)) seq := int(1) if d.HasArg("lease-seq") { d.ScanArgs(t, "lease-seq", &seq) diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/child_txn b/pkg/kv/kvserver/concurrency/testdata/lock_table/child_txn new file mode 100644 index 000000000000..f95fbb5448b8 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/child_txn @@ -0,0 +1,90 @@ +# Test that we do not block on a parent's intent. +# Test that we do not block on a parent's read lock. +# TODO(ajwerner): We need to test some of the more pernicious issues. +# In particular, if there's a write-write conflict or an attempt of a child +# to acquire a write lock over an existing read lock of the parent, then we +# need to return an error that will end up being an assertion failure. + +new-lock-table maxlocks=10000 +---- + +new-txn txn=txn1 ts=10,1 epoch=0 +---- + +new-child-txn parent=txn1 txn=txn2 ts=10,1 epoch=0 +---- + +new-request r=req1 txn=txn1 ts=10,1 spans=r@a,b+w@c,f +---- + +new-request r=req2 txn=txn2 ts=10,2 spans=r@a,b+w@c,f +---- + +scan r=req1 +---- +start-waiting: false + +guard-state r=req1 +---- +new: state=doneWaiting + +# Acquire lock on c both replicated and unreplicated. Just to trigger corner cases and since +# uncontended replicated locks are not tracked by lockTable. +acquire r=req1 k=c durability=r +---- +global: num=0 +local: num=0 + +acquire r=req1 k=c durability=u +---- +global: num=1 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +acquire r=req1 k=e durability=u +---- +global: num=2 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +dequeue r=req1 +---- +global: num=2 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +# req2 is also for txn1 and will not wait for locks that are held by self. + +scan r=req2 +---- +start-waiting: false + +acquire r=req2 k=b durability=u +---- +global: num=3 + lock: "b" + holder: txn: 00000000-0000-0000-0000-000000000002, ts: 0.000000010,2, info: unrepl epoch: 0, seqs: [0] + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +dequeue r=req2 +---- +global: num=3 + lock: "b" + holder: txn: 00000000-0000-0000-0000-000000000002, ts: 0.000000010,2, info: unrepl epoch: 0, seqs: [0] + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 +