Skip to content

Commit

Permalink
kvserver: support child transactions in the lock table
Browse files Browse the repository at this point in the history
This commit adds support for child transactions to the lockTable and
deals with unexpected WriteIntentErrors due to write-write conflicts
between the parent and child transaction.

Release note: None
  • Loading branch information
ajwerner committed Dec 1, 2020
1 parent e269861 commit 30037f1
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 35 deletions.
29 changes: 21 additions & 8 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// managerImpl implements the Manager interface.
Expand Down Expand Up @@ -256,12 +256,23 @@ func (m *managerImpl) HandleWriterIntentError(
// wait-queue. If the lock-table is disabled and one or more of the intents
// are ignored then we immediately wait on all intents.
wait := false
// While iterating the intents, detect if any is due to an ancestor
// transaction which is illegal and must be detected.
var parentIntentError error
for i := range t.Intents {
intent := &t.Intents[i]
added, err := m.lt.AddDiscoveredLock(intent, seq, g.ltg)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
for cur := g.Req.Txn; cur != nil && parentIntentError == nil; cur = cur.Parent {
if cur.ID == intent.Txn.ID {
parentIntentError = errors.AssertionFailedf(
"child transaction %s attempted to write over parent %s at key %s",
g.Req.Txn.Short(), cur.Short(), intent.Key)
break
}
}
if !added {
wait = true
}
Expand All @@ -273,6 +284,15 @@ func (m *managerImpl) HandleWriterIntentError(
// Guard. This is analogous to iterating through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())

// Detect if a discovered write intent was due to an ancestor transaction.
// Such a conflict is illegal but needs to be surfaced to be detected.
//
// TODO(ajwerner): Consider having this return a specific roachpb.Error that
// will be converted to an assertion failure on the client.
if parentIntentError != nil {
return nil, roachpb.NewError(parentIntentError)
}

// If the lockTable was disabled then we need to immediately wait on the
// intents to ensure that they are resolved and moved out of the request's
// way.
Expand Down Expand Up @@ -403,13 +423,6 @@ func (m *managerImpl) TxnWaitQueue() *txnwait.Queue {
return m.twq.(*txnwait.Queue)
}

func (r *Request) txnMeta() *enginepb.TxnMeta {
if r.Txn == nil {
return nil
}
return &r.Txn.TxnMeta
}

// readConflictTimestamp returns the maximum timestamp at which the request
// conflicts with locks acquired by other transaction. The request must wait
// for all locks acquired by other transactions at or below this timestamp
Expand Down
64 changes: 57 additions & 7 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ type lockTableGuardImpl struct {
seqNum uint64

// Information about this request.
txn *enginepb.TxnMeta
txn *roachpb.Transaction
spans *spanset.SpanSet
readTS hlc.Timestamp
writeTS hlc.Timestamp
Expand Down Expand Up @@ -421,8 +421,51 @@ func (g *lockTableGuardImpl) doneWaitingAtLock(hasReservation bool, l *lockState
g.mu.Unlock()
}

// isSameTxn returns true if txn corresponds to the txn in g or to any of its
// parents. We allow requests due to children to pass through the lock table
// without observing any locks held by the parent. This is seemingly hazardous
// in the scenario that the child is attempting to write (acquire a replicated
// lock) or to acquire an unreplicated lock. However, each of the cases ends up
// working out either due to logic at a different level or the guarantees of the
// the system.
//
// case | parent | child
// ------+--------+-------
// (1) | r | r
// (2) | r | u
// (3) | u | r
// (4) | u | u
//
// (1) Parent holds a replicated exclusive lock and the child is trying to
// write. This case is handled below the lock table by detecting the
// WriteIntentError for a parent and converting it to an assertion failure.
// Child transactions should *never* write over any of their ancestor's
// intents.
//
// (2) Parent holds a replicated exclusive lock and the child is trying to
// acquire an unreplicated read lock. In this case we'll end up no-oping
// because we'll believe that there already is a lock. This case is okay.
//
// (3) Parent holds an unreplicated lock and the child writes. When the child
// commits, it will invalidate the parent's read. Children should not be
// writing over the parent's read set. Child transactions push their
// parents above their commit timestamp and the timestamp cache will push
// the child above the parent's read timestamp so the parent is doomed
// to fail. That being said, this is safe from a correctness perspective.
// A more informative error might be better but it's hard to plumb that
// through.
//
// (4) Parent holds an unreplicated lock and the child would like to acquire
// one. This case ends up just working as we'll pass through the parent's
// lock and not acquire anything.
//
func (g *lockTableGuardImpl) isSameTxn(txn *enginepb.TxnMeta) bool {
return g.txn != nil && g.txn.ID == txn.ID
for cur := g.txn; cur != nil; cur = cur.Parent {
if cur.ID == txn.ID {
return true
}
}
return false
}

func (g *lockTableGuardImpl) isSameTxnAsReservation(ws waitingState) bool {
Expand Down Expand Up @@ -483,6 +526,13 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) {
}
}

func (g *lockTableGuardImpl) txnMeta() *enginepb.TxnMeta {
if g.txn == nil {
return nil
}
return &g.txn.TxnMeta
}

// Waiting writers in a lockState are wrapped in a queuedGuard. A waiting
// writer is typically waiting in an active state, i.e., the
// lockTableGuardImpl.key refers to this lockState. However, breaking of
Expand Down Expand Up @@ -784,7 +834,7 @@ func (l *lockState) Format(buf *strings.Builder) {
txn, ts := l.getLockHolder()
if txn == nil {
fmt.Fprintf(buf, " res: req: %d, ", l.reservation.seqNum)
writeResInfo(buf, l.reservation.txn, l.reservation.writeTS)
writeResInfo(buf, l.reservation.txnMeta(), l.reservation.writeTS)
} else {
writeHolderInfo(buf, txn, ts)
}
Expand Down Expand Up @@ -849,7 +899,7 @@ func (l *lockState) informActiveWaiters() {
waitForState.txn = lockHolderTxn
waitForState.held = true
} else {
waitForState.txn = l.reservation.txn
waitForState.txn = l.reservation.txnMeta()
if !findDistinguished && l.distinguishedWaiter.isSameTxnAsReservation(waitForState) {
findDistinguished = true
l.distinguishedWaiter = nil
Expand Down Expand Up @@ -938,7 +988,7 @@ func (l *lockState) tryMakeNewDistinguished() {
} else if l.queuedWriters.Len() > 0 {
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
if qg.active && (l.reservation == nil || !qg.guard.isSameTxn(l.reservation.txn)) {
if qg.active && (l.reservation == nil || !qg.guard.isSameTxn(l.reservation.txnMeta())) {
g = qg.guard
break
}
Expand Down Expand Up @@ -1095,7 +1145,7 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
// non-transactional request. Ignore the reservation.
return false
}
waitForState.txn = l.reservation.txn
waitForState.txn = l.reservation.txnMeta()
}

// Incompatible with whoever is holding lock or reservation.
Expand Down Expand Up @@ -1779,7 +1829,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
if guard == nil {
g = newLockTableGuardImpl()
g.seqNum = atomic.AddUint64(&t.seqNum, 1)
g.txn = req.txnMeta()
g.txn = req.Txn
g.spans = req.LockSpans
g.readTS = req.readConflictTimestamp()
g.writeTS = req.writeConflictTimestamp()
Expand Down
81 changes: 61 additions & 20 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestLockTableBasic(t *testing.T) {

datadriven.Walk(t, "testdata/lock_table", func(t *testing.T, path string) {
var lt lockTable
var txnsByName map[string]*enginepb.TxnMeta
var txnsByName map[string]*roachpb.Transaction
var txnCounter uint128.Uint128
var requestsByName map[string]Request
var guardsByReqName map[string]lockTableGuard
Expand All @@ -146,11 +146,45 @@ func TestLockTableBasic(t *testing.T) {
enabledSeq: 1,
maxLocks: int64(maxLocks),
}
txnsByName = make(map[string]*enginepb.TxnMeta)
txnsByName = make(map[string]*roachpb.Transaction)
txnCounter = uint128.FromInts(0, 0)
requestsByName = make(map[string]Request)
guardsByReqName = make(map[string]lockTableGuard)
return ""
case "new-child-txn":
var parentName string
d.ScanArgs(t, "parent", &parentName)
parent, ok := txnsByName[parentName]
if !ok {
d.Fatalf(t, "unknown parent transaction %q", parentName)
}
var txnName string
d.ScanArgs(t, "txn", &txnName)
ts := scanTimestamp(t, d)
var epoch int
d.ScanArgs(t, "epoch", &epoch)
var seq int
if d.HasArg("seq") {
d.ScanArgs(t, "seq", &seq)
}
txnMeta, ok := txnsByName[txnName]
var id uuid.UUID
if ok {
id = txnMeta.ID
} else {
id = nextUUID(&txnCounter)
}
txnsByName[txnName] = &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: id,
Epoch: enginepb.TxnEpoch(epoch),
Sequence: enginepb.TxnSeq(seq),
WriteTimestamp: ts,
},
ReadTimestamp: ts,
Parent: parent.Clone(),
}
return ""

case "new-txn":
// UUIDs for transactions are numbered from 1 by this test code and
Expand All @@ -173,11 +207,14 @@ func TestLockTableBasic(t *testing.T) {
} else {
id = nextUUID(&txnCounter)
}
txnsByName[txnName] = &enginepb.TxnMeta{
ID: id,
Epoch: enginepb.TxnEpoch(epoch),
Sequence: enginepb.TxnSeq(seq),
WriteTimestamp: ts,
txnsByName[txnName] = &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: id,
Epoch: enginepb.TxnEpoch(epoch),
Sequence: enginepb.TxnSeq(seq),
WriteTimestamp: ts,
},
ReadTimestamp: ts,
}
return ""

Expand Down Expand Up @@ -208,10 +245,8 @@ func TestLockTableBasic(t *testing.T) {
// Update the transaction's timestamp, if necessary. The transaction
// may have needed to move its timestamp for any number of reasons.
txnMeta.WriteTimestamp = ts
req.Txn = &roachpb.Transaction{
TxnMeta: *txnMeta,
ReadTimestamp: ts,
}
req.Txn = txnMeta.Clone()
req.Txn.ReadTimestamp = ts
}
requestsByName[reqName] = req
return ""
Expand Down Expand Up @@ -254,15 +289,15 @@ func TestLockTableBasic(t *testing.T) {
case "release":
var txnName string
d.ScanArgs(t, "txn", &txnName)
txnMeta, ok := txnsByName[txnName]
txn, ok := txnsByName[txnName]
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
}
var s string
d.ScanArgs(t, "span", &s)
span := getSpan(t, d, s)
// TODO(sbhola): also test ABORTED.
intent := &roachpb.LockUpdate{Span: span, Txn: *txnMeta, Status: roachpb.COMMITTED}
intent := &roachpb.LockUpdate{Span: span, Txn: txn.TxnMeta, Status: roachpb.COMMITTED}
if err := lt.UpdateLocks(intent); err != nil {
return err.Error()
}
Expand All @@ -271,17 +306,23 @@ func TestLockTableBasic(t *testing.T) {
case "update":
var txnName string
d.ScanArgs(t, "txn", &txnName)
txnMeta, ok := txnsByName[txnName]
txn, ok := txnsByName[txnName]
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
}
ts := scanTimestamp(t, d)
var epoch int
d.ScanArgs(t, "epoch", &epoch)
txnMeta = &enginepb.TxnMeta{ID: txnMeta.ID, Sequence: txnMeta.Sequence}
txnMeta.Epoch = enginepb.TxnEpoch(epoch)
txnMeta.WriteTimestamp = ts
txnsByName[txnName] = txnMeta
txn = &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: txn.ID,
Sequence: txn.Sequence,
WriteTimestamp: ts,
Epoch: enginepb.TxnEpoch(epoch),
},
ReadTimestamp: ts,
}
txnsByName[txnName] = txn
var s string
d.ScanArgs(t, "span", &s)
span := getSpan(t, d, s)
Expand Down Expand Up @@ -313,7 +354,7 @@ func TestLockTableBasic(t *testing.T) {
}
// TODO(sbhola): also test STAGING.
intent := &roachpb.LockUpdate{
Span: span, Txn: *txnMeta, Status: roachpb.PENDING, IgnoredSeqNums: ignored}
Span: span, Txn: txn.TxnMeta, Status: roachpb.PENDING, IgnoredSeqNums: ignored}
if err := lt.UpdateLocks(intent); err != nil {
return err.Error()
}
Expand All @@ -334,7 +375,7 @@ func TestLockTableBasic(t *testing.T) {
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
}
intent := roachpb.MakeIntent(txnMeta, roachpb.Key(key))
intent := roachpb.MakeIntent(&txnMeta.TxnMeta, roachpb.Key(key))
seq := int(1)
if d.HasArg("lease-seq") {
d.ScanArgs(t, "lease-seq", &seq)
Expand Down
Loading

0 comments on commit 30037f1

Please sign in to comment.