Skip to content

Commit

Permalink
kv/concurrency: never regress timestamp in lockTable on acquisition
Browse files Browse the repository at this point in the history
Fixes #46290.
Fixes #43735.
Fixes #41425.

This change adjusts the lockTable to be more lenient to the timestamp
of new lock acquisitions. Specifically, it lifts the restriction that
all calls to AcquireLock use monotonically increasing timestamps.
Instead, it properly handles apparent timestamp regressions by
ratcheting lock timestamps instead of replacing them directly.

This matches the corresponding behavior in MVCC:
  https://github.com/cockroachdb/cockroach/blob/92107b551bbafe54fddb496442c590cb6feb5d65/pkg/storage/mvcc.go#L1631

This leniency is needed for sequences of events like the following:
- txn A acquires lock at epoch 1, ts 10
- txn B pushes txn A to ts 20
- txn B updates lock to ts 20
- txn A restarts at ts 15 without noticing that it has been pushes
- txn A re-acquires lock at epoch 2, ts 15
- we hit the lock timestamp regression assertion

We see this frequently in CDC roachtests because the rangefeed
processor performs high-priority timestamp pushes on long-running
transactions. Outside of CDC, this is rare in our system.

Release note (bug fix): CDC no longer combines with long
running transactions to trigger an assertion.

Release justification: fixes a high-priority bug in existing
functionality. The bug could crash a server if the right sequence
of events occurred. This was typically rare, but was much more
common when CDC was in use.
  • Loading branch information
nvanbenschoten authored and dt committed Mar 23, 2020
1 parent 69b878a commit fcd74cd
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 22 deletions.
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import (
// handle-write-intent-error req=<req-name> txn=<txn-name> key=<key>
// handle-txn-push-error req=<req-name> txn=<txn-name> key=<key> TODO(nvanbenschoten): implement this
//
// on-lock-acquired req=<req-name> key=<key>
// on-lock-acquired req=<req-name> key=<key> [seq=<seq>]
// on-lock-updated req=<req-name> txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]]
// on-txn-updated txn=<txn-name> status=[committed|aborted|pending] [ts=<int>[,<int>]]
//
Expand Down Expand Up @@ -270,13 +270,20 @@ func TestConcurrencyManagerBasic(t *testing.T) {
var key string
d.ScanArgs(t, "key", &key)

var seq int
if d.HasArg("seq") {
d.ScanArgs(t, "seq", &seq)
}
seqNum := enginepb.TxnSeq(seq)

// Confirm that the request has a corresponding write request.
found := false
for _, ru := range guard.Req.Requests {
req := ru.GetInner()
keySpan := roachpb.Span{Key: roachpb.Key(key)}
if roachpb.IsLocking(req) &&
req.Header().Span().Contains(keySpan) {
req.Header().Span().Contains(keySpan) &&
req.Header().Sequence == seqNum {
found = true
break
}
Expand All @@ -285,10 +292,13 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.Fatalf(t, "missing corresponding write request")
}

txnAcquire := txn.Clone()
txnAcquire.Sequence = seqNum

mon.runSync("acquire lock", func(ctx context.Context) {
log.Eventf(ctx, "txn %s @ %s", txn.ID.Short(), key)
span := roachpb.Span{Key: roachpb.Key(key)}
up := roachpb.MakeLockUpdateWithDur(txn, span, lock.Unreplicated)
up := roachpb.MakeLockUpdateWithDur(txnAcquire, span, lock.Unreplicated)
m.OnLockAcquired(ctx, &up)
})
return c.waitAndCollect(t, mon)
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -81,15 +82,28 @@ func scanSingleRequest(
}
return v
}
maybeGetSeq := func() enginepb.TxnSeq {
s, ok := fields["seq"]
if !ok {
return 0
}
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
d.Fatalf(t, "could not parse seq num: %v", err)
}
return enginepb.TxnSeq(n)
}

switch cmd {
case "get":
var r roachpb.GetRequest
r.Sequence = maybeGetSeq()
r.Key = roachpb.Key(mustGetField("key"))
return &r

case "scan":
var r roachpb.ScanRequest
r.Sequence = maybeGetSeq()
r.Key = roachpb.Key(mustGetField("key"))
if v, ok := fields["endkey"]; ok {
r.EndKey = roachpb.Key(v)
Expand All @@ -98,6 +112,7 @@ func scanSingleRequest(

case "put":
var r roachpb.PutRequest
r.Sequence = maybeGetSeq()
r.Key = roachpb.Key(mustGetField("key"))
r.Value.SetString(mustGetField("value"))
return &r
Expand Down
28 changes: 23 additions & 5 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,11 +1140,11 @@ func (l *lockState) acquireLock(
if txn.ID != beforeTxn.ID {
return errors.Errorf("caller violated contract: existing lock cannot be acquired by different transaction")
}
seqs := l.holder.holder[durability].seqs
if l.holder.holder[durability].txn != nil && l.holder.holder[durability].txn.Epoch < txn.Epoch {
// Clear the sequences for the older epoch.
l.holder.holder[durability].seqs = l.holder.holder[durability].seqs[:0]
seqs = seqs[:0]
}
seqs := l.holder.holder[durability].seqs
if len(seqs) > 0 && seqs[len(seqs)-1] >= txn.Sequence {
// Idempotent lock acquisition. In this case, we simply ignore the lock
// acquisition as long as it corresponds to an existing sequence number.
Expand All @@ -1164,11 +1164,29 @@ func (l *lockState) acquireLock(
return nil
}
l.holder.holder[durability].txn = txn
l.holder.holder[durability].ts = ts
l.holder.holder[durability].seqs = append(l.holder.holder[durability].seqs, txn.Sequence)
// Forward the lock's timestamp instead of assigning to it blindly.
// While lock acquisition uses monotonically increasing timestamps
// from the perspective of the transaction's coordinator, this does
// not guarantee that a lock will never be acquired at a higher
// epoch and/or sequence number but with a lower timestamp when in
// the presence of transaction pushes. Consider the following
// sequence of events:
//
// - txn A acquires lock at sequence 1, ts 10
// - txn B pushes txn A to ts 20
// - txn B updates lock to ts 20
// - txn A's coordinator does not immediately learn of the push
// - txn A re-acquires lock at sequence 2, ts 15
//
// A lock's timestamp cannot be allowed to regress, so by forwarding
// its timestamp during the second acquisition instead if assigning
// to it blindly, it remains at 20.
l.holder.holder[durability].ts.Forward(ts)
l.holder.holder[durability].seqs = append(seqs, txn.Sequence)

_, afterTs, _ := l.getLockerInfo()
if afterTs.Less(beforeTs) {
return errors.Errorf("caller violated contract: lock timestamp regression")
panic("lockTable bug - lock timestamp regression")
} else if beforeTs.Less(afterTs) {
l.increasedLockTs(afterTs)
}
Expand Down
240 changes: 240 additions & 0 deletions pkg/kv/kvserver/concurrency/testdata/concurrency_manager/update
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# -------------------------------------------------------------
# A transaction writes an intent. The intent is pushed to a
# higher timestamp by a second transaction. The transaction then
# returns to re-acquire the intent at a new sequence number but
# still at the original timestamp. This is permitted but the
# lock's timestamp should not regress.
#
# Setup: txn1 acquire lock k
# txn2 reads k and waits
# txn2 pushes txn1
#
# Test: txn2 succeeds in pushing txn1's ts forward
# txn2 proceeds
# txn1 re-acquires lock k at new seq num, lower ts
# -------------------------------------------------------------

new-txn name=txn1 ts=10,1 epoch=0
----

new-txn name=txn2 ts=12,1 epoch=0
----

new-request name=req1 txn=txn1 ts=10,1
put key=k value=v
----

new-request name=req2 txn=txn2 ts=12,1
get key=k
----

sequence req=req1
----
[1] sequence req1: sequencing request
[1] sequence req1: acquiring latches
[1] sequence req1: scanning lock table for conflicting locks
[1] sequence req1: sequencing complete, returned guard

on-lock-acquired req=req1 key=k
----
[-] acquire lock: txn 00000001 @ k

finish req=req1
----
[-] finish req1: finishing request

sequence req=req2
----
[2] sequence req2: sequencing request
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: waiting in lock wait-queues
[2] sequence req2: pushing timestamp of txn 00000001 above 0.000000012,1
[2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0]
waiting readers:
req: 2, txn: 00000002-0000-0000-0000-000000000000
distinguished req: 2
local: num=0

# --------------------------------
# Setup complete, test starts here
# --------------------------------

on-txn-updated txn=txn1 status=pending ts=12,2
----
[-] update txn: increasing timestamp of txn1
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: sequencing complete, returned guard

finish req=req2
----
[-] finish req2: finishing request

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000012,2, info: unrepl epoch: 0, seqs: [0]
local: num=0

# Issue another write to the same key for txn1 at its initial
# timestamp. The timestamp in the lock table does not regress.

new-request name=req3 txn=txn1 ts=10,1
put key=k value=v2 seq=1
----

sequence req=req3
----
[3] sequence req3: sequencing request
[3] sequence req3: acquiring latches
[3] sequence req3: scanning lock table for conflicting locks
[3] sequence req3: sequencing complete, returned guard

on-lock-acquired req=req3 key=k seq=1
----
[-] acquire lock: txn 00000001 @ k

finish req=req3
----
[-] finish req3: finishing request

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000012,2, info: unrepl epoch: 0, seqs: [0, 1]
local: num=0

reset namespace
----

# -------------------------------------------------------------
# A transaction writes an intent. The intent is pushed to a
# higher timestamp by a second transaction. The transaction then
# returns to re-acquire the intent at a new epoch but still at
# the original timestamp. This is permitted but the lock's
# timestamp should not regress.
#
# Setup: txn1 acquire lock k
# txn2 reads k and waits
#
# Test: txn2 pushes txn1's timestamp forward
# txn2 proceeds
# txn1 re-acquires lock k at new epoch, lower ts
# -------------------------------------------------------------

new-txn name=txn1 ts=10,1 epoch=0
----

new-txn name=txn2 ts=12,1 epoch=0
----

new-request name=req1 txn=txn1 ts=10,1
put key=k value=v
----

new-request name=req2 txn=txn2 ts=12,1
get key=k
----

sequence req=req1
----
[1] sequence req1: sequencing request
[1] sequence req1: acquiring latches
[1] sequence req1: scanning lock table for conflicting locks
[1] sequence req1: sequencing complete, returned guard

on-lock-acquired req=req1 key=k
----
[-] acquire lock: txn 00000001 @ k

finish req=req1
----
[-] finish req1: finishing request

sequence req=req2
----
[2] sequence req2: sequencing request
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: waiting in lock wait-queues
[2] sequence req2: pushing timestamp of txn 00000001 above 0.000000012,1
[2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0]
waiting readers:
req: 5, txn: 00000002-0000-0000-0000-000000000000
distinguished req: 5
local: num=0

# --------------------------------
# Setup complete, test starts here
# --------------------------------

on-txn-updated txn=txn1 status=pending ts=12,2
----
[-] update txn: increasing timestamp of txn1
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: sequencing complete, returned guard

finish req=req2
----
[-] finish req2: finishing request

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000012,2, info: unrepl epoch: 0, seqs: [0]
local: num=0

# The txn restarts at a new timestamp, but below the pushed
# timestamp. It re-issues the same write at the new epoch. The
# timestamp in the lock table does not regress.

new-txn name=txn1 ts=11,1 epoch=1
----

new-request name=req3 txn=txn1 ts=11,1
put key=k value=v2
----

sequence req=req3
----
[3] sequence req3: sequencing request
[3] sequence req3: acquiring latches
[3] sequence req3: scanning lock table for conflicting locks
[3] sequence req3: sequencing complete, returned guard

on-lock-acquired req=req3 key=k
----
[-] acquire lock: txn 00000001 @ k

finish req=req3
----
[-] finish req3: finishing request

debug-lock-table
----
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000012,2, info: unrepl epoch: 1, seqs: [0]
local: num=0

reset namespace
----
Loading

0 comments on commit fcd74cd

Please sign in to comment.