Skip to content

Commit

Permalink
kv/concurrency: bind on-lock-{acquired,updated} commands to requests
Browse files Browse the repository at this point in the history
This commit updates the TestConcurrencyManagerBasic framework to bind
all `on-lock-acquired` and `on-lock-updated` commands to specific
requests. This prevents misuse of these hooks. For instance, it ensures
that latches are properly held before allowing the hooks to be executed
(by nature of the guard being in guardsByReqName) and it ensures that
the keys provided to the hooks agree with those in the request
definition.

Release justification: testing only
  • Loading branch information
nvanbenschoten authored and dt committed Mar 23, 2020
1 parent 773c9b5 commit 69b878a
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 127 deletions.
64 changes: 52 additions & 12 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ import (
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [maxts=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [consistency]
// <proto-name> [<field-name>=<field-value>...]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name>
// finish req=<req-name>
//
// handle-write-intent-error req=<req-name> txn=<txn-name> key=<key>
// handle-txn-push-error req=<req-name> txn=<txn-name> key=<key> TODO(nvanbenschoten): implement this
//
// on-lock-acquired txn=<txn-name> key=<key>
// on-lock-updated txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]]
// on-lock-acquired req=<req-name> key=<key>
// on-lock-updated req=<req-name> txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]]
// on-txn-updated txn=<txn-name> status=[committed|aborted|pending] [ts=<int>[,<int>]]
//
// on-lease-updated leaseholder=<bool>
Expand Down Expand Up @@ -136,8 +136,10 @@ func TestConcurrencyManagerBasic(t *testing.T) {
}

ts := scanTimestamp(t, d)
if txn != nil && txn.ReadTimestamp != ts {
d.Fatalf(t, "txn read timestamp != timestamp")
if txn != nil {
txn = txn.Clone()
txn.ReadTimestamp = ts
txn.WriteTimestamp = ts
}

readConsistency := roachpb.CONSISTENT
Expand All @@ -149,7 +151,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
var reqs []roachpb.Request
singleReqLines := strings.Split(d.Input, "\n")
for _, line := range singleReqLines {
req := scanSingleRequest(t, d, line)
req := scanSingleRequest(t, d, line, c.txnsByName)
reqs = append(reqs, req)
}
reqUnions := make([]roachpb.RequestUnion, len(reqs))
Expand Down Expand Up @@ -257,25 +259,48 @@ func TestConcurrencyManagerBasic(t *testing.T) {
return c.waitAndCollect(t, mon)

case "on-lock-acquired":
var txnName string
d.ScanArgs(t, "txn", &txnName)
txn, ok := c.txnsByName[txnName]
var reqName string
d.ScanArgs(t, "req", &reqName)
guard, ok := c.guardsByReqName[reqName]
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
d.Fatalf(t, "unknown request: %s", reqName)
}
txn := guard.Req.Txn

var key string
d.ScanArgs(t, "key", &key)

// Confirm that the request has a corresponding write request.
found := false
for _, ru := range guard.Req.Requests {
req := ru.GetInner()
keySpan := roachpb.Span{Key: roachpb.Key(key)}
if roachpb.IsLocking(req) &&
req.Header().Span().Contains(keySpan) {
found = true
break
}
}
if !found {
d.Fatalf(t, "missing corresponding write request")
}

mon.runSync("acquire lock", func(ctx context.Context) {
log.Eventf(ctx, "%s @ %s", txnName, key)
log.Eventf(ctx, "txn %s @ %s", txn.ID.Short(), key)
span := roachpb.Span{Key: roachpb.Key(key)}
up := roachpb.MakeLockUpdateWithDur(txn, span, lock.Unreplicated)
m.OnLockAcquired(ctx, &up)
})
return c.waitAndCollect(t, mon)

case "on-lock-updated":
var reqName string
d.ScanArgs(t, "req", &reqName)
guard, ok := c.guardsByReqName[reqName]
if !ok {
d.Fatalf(t, "unknown request: %s", reqName)
}

var txnName string
d.ScanArgs(t, "txn", &txnName)
txn, ok := c.txnsByName[txnName]
Expand All @@ -292,12 +317,27 @@ func TestConcurrencyManagerBasic(t *testing.T) {
ts = scanTimestamp(t, d)
}

// Confirm that the request has a corresponding ResolveIntent.
found := false
for _, ru := range guard.Req.Requests {
if riReq := ru.GetResolveIntent(); riReq != nil &&
riReq.IntentTxn.ID == txn.ID &&
riReq.Key.Equal(roachpb.Key(key)) &&
riReq.Status == status {
found = true
break
}
}
if !found {
d.Fatalf(t, "missing corresponding resolve intent request")
}

txnUpdate := txn.Clone()
txnUpdate.Status = status
txnUpdate.WriteTimestamp.Forward(ts)

mon.runSync("update lock", func(ctx context.Context) {
log.Eventf(ctx, "%s %s @ %s", verb, txnName, key)
log.Eventf(ctx, "%s txn %s @ %s", verb, txn.ID.Short(), key)
span := roachpb.Span{Key: roachpb.Key(key)}
up := roachpb.MakeLockUpdate(txnUpdate, span)
m.OnLockUpdated(ctx, &up)
Expand Down
39 changes: 32 additions & 7 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hl
return ts
}

func scanSingleRequest(t *testing.T, d *datadriven.TestData, line string) roachpb.Request {
func scanSingleRequest(
t *testing.T, d *datadriven.TestData, line string, txns map[string]*roachpb.Transaction,
) roachpb.Request {
cmd, cmdArgs, err := datadriven.ParseLine(line)
if err != nil {
d.Fatalf(t, "error parsing single request: %v", err)
Expand Down Expand Up @@ -100,6 +102,13 @@ func scanSingleRequest(t *testing.T, d *datadriven.TestData, line string) roachp
r.Value.SetString(mustGetField("value"))
return &r

case "resolve-intent":
var r roachpb.ResolveIntentRequest
r.IntentTxn = txns[mustGetField("txn")].TxnMeta
r.Key = roachpb.Key(mustGetField("key"))
r.Status = parseTxnStatus(t, d, mustGetField("status"))
return &r

case "request-lease":
var r roachpb.RequestLeaseRequest
return &r
Expand All @@ -113,15 +122,31 @@ func scanSingleRequest(t *testing.T, d *datadriven.TestData, line string) roachp
func scanTxnStatus(t *testing.T, d *datadriven.TestData) (roachpb.TransactionStatus, string) {
var statusStr string
d.ScanArgs(t, "status", &statusStr)
switch statusStr {
status := parseTxnStatus(t, d, statusStr)
var verb string
switch status {
case roachpb.COMMITTED:
verb = "committing"
case roachpb.ABORTED:
verb = "aborting"
case roachpb.PENDING:
verb = "increasing timestamp of"
default:
d.Fatalf(t, "unknown txn status: %s", status)
}
return status, verb
}

func parseTxnStatus(t *testing.T, d *datadriven.TestData, s string) roachpb.TransactionStatus {
switch s {
case "committed":
return roachpb.COMMITTED, "committing"
return roachpb.COMMITTED
case "aborted":
return roachpb.ABORTED, "aborting"
return roachpb.ABORTED
case "pending":
return roachpb.PENDING, "increasing timestamp of"
return roachpb.PENDING
default:
d.Fatalf(t, "unknown txn statusStr: %s", statusStr)
return 0, ""
d.Fatalf(t, "unknown txn status: %s", s)
return 0
}
}
122 changes: 72 additions & 50 deletions pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ sequence req=req2
[1] sequence req2: scanning lock table for conflicting locks
[1] sequence req2: sequencing complete, returned guard

on-lock-acquired txn=txn2 key=k
on-lock-acquired req=req2 key=k
----
[-] acquire lock: txn2 @ k
[-] acquire lock: txn 00000002 @ k

debug-lock-table
----
Expand Down Expand Up @@ -90,9 +90,20 @@ local: num=0
# 6. Requests proceed in order
# -------------------------------------------------------------

on-lock-acquired txn=txn2 key=k
sequence req=req2
----
[-] acquire lock: txn2 @ k
[1] sequence req2: sequencing request
[1] sequence req2: acquiring latches
[1] sequence req2: scanning lock table for conflicting locks
[1] sequence req2: sequencing complete, returned guard

on-lock-acquired req=req2 key=k
----
[-] acquire lock: txn 00000002 @ k

finish req=req2
----
[-] finish req2: finishing request

new-request name=req3 txn=txn3 ts=14,1
get key=k
Expand All @@ -101,20 +112,20 @@ new-request name=req3 txn=txn3 ts=14,1

sequence req=req3
----
[1] sequence req3: sequencing request
[1] sequence req3: acquiring latches
[1] sequence req3: scanning lock table for conflicting locks
[1] sequence req3: waiting in lock wait-queues
[1] sequence req3: pushing timestamp of txn 00000002 above 0.000000014,1
[1] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction
[2] sequence req3: sequencing request
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: waiting in lock wait-queues
[2] sequence req3: pushing timestamp of txn 00000002 above 0.000000014,1
[2] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction

on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[1] sequence req3: resolving intent "k" for txn 00000002 with COMMITTED status
[1] sequence req3: acquiring latches
[1] sequence req3: scanning lock table for conflicting locks
[1] sequence req3: sequencing complete, returned guard
[2] sequence req3: resolving intent "k" for txn 00000002 with COMMITTED status
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: sequencing complete, returned guard

debug-lock-table
----
Expand All @@ -127,9 +138,9 @@ new-request name=req4 txn=txn1 ts=10,1

sequence req=req4
----
[2] sequence req4: sequencing request
[2] sequence req4: acquiring latches
[2] sequence req4: blocked on select in spanlatch.(*Manager).waitForSignal
[3] sequence req4: sequencing request
[3] sequence req4: acquiring latches
[3] sequence req4: blocked on select in spanlatch.(*Manager).waitForSignal

debug-latch-manager
----
Expand All @@ -139,8 +150,8 @@ write count: 1
finish req=req3
----
[-] finish req3: finishing request
[2] sequence req4: scanning lock table for conflicting locks
[2] sequence req4: sequencing complete, returned guard
[3] sequence req4: scanning lock table for conflicting locks
[3] sequence req4: sequencing complete, returned guard

finish req=req4
----
Expand All @@ -160,55 +171,66 @@ reset
# 8. Requests proceed in order
# -------------------------------------------------------------

on-lock-acquired txn=txn2 key=k
sequence req=req2
----
[-] acquire lock: txn2 @ k
[1] sequence req2: sequencing request
[1] sequence req2: acquiring latches
[1] sequence req2: scanning lock table for conflicting locks
[1] sequence req2: sequencing complete, returned guard

on-lock-acquired req=req2 key=k
----
[-] acquire lock: txn 00000002 @ k

finish req=req2
----
[-] finish req2: finishing request

new-request name=req5 txn=none ts=14,1
scan key=a endkey=m
----

sequence req=req5
----
[1] sequence req5: sequencing request
[1] sequence req5: acquiring latches
[1] sequence req5: scanning lock table for conflicting locks
[1] sequence req5: waiting in lock wait-queues
[1] sequence req5: pushing timestamp of txn 00000002 above 0.000000014,1
[1] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction
[2] sequence req5: sequencing request
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: waiting in lock wait-queues
[2] sequence req5: pushing timestamp of txn 00000002 above 0.000000014,1
[2] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction

new-request name=req6 txn=none ts=16,1
scan key=c endkey=z
----

sequence req=req6
----
[2] sequence req6: sequencing request
[2] sequence req6: acquiring latches
[2] sequence req6: scanning lock table for conflicting locks
[2] sequence req6: waiting in lock wait-queues
[2] sequence req6: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn
[3] sequence req6: sequencing request
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: waiting in lock wait-queues
[3] sequence req6: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn

on-txn-updated txn=txn2 status=pending ts=18,1
----
[-] update txn: increasing timestamp of txn2
[1] sequence req5: resolving intent "k" for txn 00000002 with PENDING status
[1] sequence req5: acquiring latches
[1] sequence req5: scanning lock table for conflicting locks
[1] sequence req5: sequencing complete, returned guard
[2] sequence req6: acquiring latches
[2] sequence req6: scanning lock table for conflicting locks
[2] sequence req6: sequencing complete, returned guard
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: sequencing complete, returned guard
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: sequencing complete, returned guard

new-request name=req7 txn=none ts=12,1
put key=k value=v
----

sequence req=req7
----
[3] sequence req7: sequencing request
[3] sequence req7: acquiring latches
[3] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal
[4] sequence req7: sequencing request
[4] sequence req7: acquiring latches
[4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=req5
----
Expand All @@ -217,18 +239,18 @@ finish req=req5
finish req=req6
----
[-] finish req6: finishing request
[3] sequence req7: scanning lock table for conflicting locks
[3] sequence req7: waiting in lock wait-queues
[3] sequence req7: pushing txn 00000002 to abort
[3] sequence req7: blocked on select in concurrency_test.(*cluster).PushTransaction
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: waiting in lock wait-queues
[4] sequence req7: pushing txn 00000002 to abort
[4] sequence req7: blocked on select in concurrency_test.(*cluster).PushTransaction

on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[3] sequence req7: resolving intent "k" for txn 00000002 with COMMITTED status
[3] sequence req7: acquiring latches
[3] sequence req7: scanning lock table for conflicting locks
[3] sequence req7: sequencing complete, returned guard
[4] sequence req7: resolving intent "k" for txn 00000002 with COMMITTED status
[4] sequence req7: acquiring latches
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: sequencing complete, returned guard

finish req=req7
----
Expand Down
Loading

0 comments on commit 69b878a

Please sign in to comment.