diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 1e7efabff736..abcb1cf6d2c3 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -367,6 +367,14 @@ type Request struct { // transactions. WaitPolicy lock.WaitPolicy + // The maximum length of a lock wait-queue that the request is willing + // to enter and wait in. Used to provide a release valve and ensure some + // level of quality-of-service under severe per-key contention. If set + // to a non-zero value and an existing lock wait-queue is already equal + // to or exceeding this length, the request will be rejected eagerly + // with a WriteIntentError instead of entering the queue and waiting. + MaxLockWaitQueueLength int + // The individual requests in the batch. Requests []roachpb.RequestUnion diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 706017bf751a..f6d10f98e0bf 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -32,6 +32,55 @@ import ( "github.com/cockroachdb/errors" ) +// MaxLockWaitQueueLength sets the maximum length of a lock wait-queue that the +// request is willing to enter and wait in. Used to provide a release valve and +// ensure some level of quality-of-service under severe per-key contention. If +// set to a non-zero value and an existing lock wait-queue is already equal to +// or exceeding this length, the request will be rejected eagerly instead of +// entering the queue and waiting. +// +// This is a fairly blunt mechanism to place an upper bound on resource +// utilization per lock wait-queue and ensure some reasonable level of +// quality-of-service for transactions that enter a lock wait-queue. More +// sophisticated queueing alternatives exist that account for queueing time and +// detect sustained queue growth before rejecting: +// - https://queue.acm.org/detail.cfm?id=2209336 +// - https://queue.acm.org/detail.cfm?id=2839461 +// +// We could explore these algorithms if this setting is too coarse grained and +// not serving its purpose well enough. +// +// Alternatively, we could implement the lock_timeout session variable that +// exists in Postgres (#67513) and use that to ensure quality-of-service for +// requests that wait for locks. With that configuration, this cluster setting +// would be relegated to a guardrail that protects against unbounded resource +// utilization and runaway queuing for misbehaving clients, a role it is well +// positioned to serve. +var MaxLockWaitQueueLength = settings.RegisterIntSetting( + "kv.lock_table.maximum_lock_wait_queue_length", + "the maximum length of a lock wait-queue that requests are willing to enter "+ + "and wait in. The setting can be used to ensure some level of quality-of-service "+ + "under severe per-key contention. If set to a non-zero value and an existing lock "+ + "wait-queue is already equal to or exceeding this length, requests will be rejected "+ + "eagerly instead of entering the queue and waiting. Set to 0 to disable.", + 0, + func(v int64) error { + if v < 0 { + return errors.Errorf("cannot be set to a negative value: %d", v) + } + if v == 0 { + return nil // disabled + } + // Don't let the setting be dropped below a reasonable value that we don't + // expect to impact internal transaction processing. + const minSafeMaxLength = 3 + if v < minSafeMaxLength { + return errors.Errorf("cannot be set below %d: %d", minSafeMaxLength, v) + } + return nil + }, +) + // DiscoveredLocksThresholdToConsultFinalizedTxnCache sets a threshold as // mentioned in the description string. The default of 200 is somewhat // arbitrary but should suffice for small OLTP transactions. Given the default @@ -230,6 +279,12 @@ func (m *managerImpl) sequenceReqWithGuard(ctx context.Context, g *Guard) (Respo return nil, nil } + // Set the request's MaxWaitQueueLength based on the cluster setting, if not + // already set. + if g.Req.MaxLockWaitQueueLength == 0 { + g.Req.MaxLockWaitQueueLength = int(MaxLockWaitQueueLength.Get(&m.st.SV)) + } + if g.EvalKind == OptimisticEval { if g.ltg != nil { panic("Optimistic locking should not have a non-nil lockTableGuard") diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 0f401148dae5..ecd7750961c3 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -54,7 +54,7 @@ import ( // The input files use the following DSL: // // new-txn name= ts=[,] epoch= [uncertainty-limit=[,]] -// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] +// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [max-lock-wait-queue-length=] // [=...] (hint: see scanSingleRequest) // sequence req= [eval-kind= @@ -155,6 +155,11 @@ func TestConcurrencyManagerBasic(t *testing.T) { waitPolicy := scanWaitPolicy(t, d, false /* required */) + var maxLockWaitQueueLength int + if d.HasArg("max-lock-wait-queue-length") { + d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) + } + // Each roachpb.Request is provided on an indented line. reqs, reqUnions := scanRequests(t, d, c) latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs) @@ -163,11 +168,12 @@ func TestConcurrencyManagerBasic(t *testing.T) { Txn: txn, Timestamp: ts, // TODO(nvanbenschoten): test Priority - ReadConsistency: readConsistency, - WaitPolicy: waitPolicy, - Requests: reqUnions, - LatchSpans: latchSpans, - LockSpans: lockSpans, + ReadConsistency: readConsistency, + WaitPolicy: waitPolicy, + MaxLockWaitQueueLength: maxLockWaitQueueLength, + Requests: reqUnions, + LatchSpans: latchSpans, + LockSpans: lockSpans, } return "" diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 9622f268aecb..9a86beee9753 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -64,6 +64,12 @@ const ( // without pushing anyone. waitSelf + // waitQueueMaxLengthExceeded indicates that the request attempted to enter a + // lock wait-queue as a writer and found that the queue's length was already + // equal to or exceeding the request's configured maximum. As a result, the + // request was rejected. + waitQueueMaxLengthExceeded + // doneWaiting indicates that the request is done waiting on this pass // through the lockTable and should make another call to ScanAndEnqueue. doneWaiting @@ -112,6 +118,9 @@ func (s waitingState) String() string { return "wait elsewhere by proceeding to evaluation" } return fmt.Sprintf("wait elsewhere for txn %s @ key %s", s.txn.ID.Short(), s.key) + case waitQueueMaxLengthExceeded: + return fmt.Sprintf("wait-queue maximum length exceeded @ key %s with length %d", + s.key, s.queuedWriters) case doneWaiting: return "done waiting" default: @@ -316,6 +325,11 @@ func newLockTable(maxLocks int64) *lockTableImpl { // transaction has a reservation. See the comment about "Reservations" in // lockState. // +// - The waitQueueMaxLengthExceeded state is used to indicate that the request +// was rejected because it attempted to enter a lock wait-queue as a writer +// and found that the queue's length was already equal to or exceeding the +// request's configured maximum. +// // - The doneWaiting state is used to indicate that the request should make // another call to ScanAndEnqueue() (that next call is more likely to return a // lockTableGuard that returns false from StartWaiting()). @@ -324,9 +338,10 @@ type lockTableGuardImpl struct { lt *lockTableImpl // Information about this request. - txn *enginepb.TxnMeta - ts hlc.Timestamp - spans *spanset.SpanSet + txn *enginepb.TxnMeta + ts hlc.Timestamp + spans *spanset.SpanSet + maxWaitQueueLength int // Snapshots of the trees for which this request has some spans. Note that // the lockStates in these snapshots may have been removed from @@ -1348,7 +1363,13 @@ func (l *lockState) tryActiveWait( } } - waitForState := waitingState{kind: waitFor, key: l.key} + waitForState := waitingState{ + kind: waitFor, + key: l.key, + queuedWriters: l.queuedWriters.Len(), + queuedReaders: l.waitingReaders.Len(), + guardAccess: sa, + } if lockHolderTxn != nil { waitForState.txn = lockHolderTxn waitForState.held = true @@ -1411,8 +1432,21 @@ func (l *lockState) tryActiveWait( guard: g, active: true, } - if l.queuedWriters.Len() == 0 { + if curLen := l.queuedWriters.Len(); curLen == 0 { l.queuedWriters.PushFront(qg) + } else if g.maxWaitQueueLength > 0 && curLen >= g.maxWaitQueueLength { + // The wait-queue is longer than the request is willing to wait for. + // Instead of entering the queue, immediately reject the request. + g.mu.startWait = true + state := waitForState + state.kind = waitQueueMaxLengthExceeded + g.mu.state = state + if notify { + g.notify() + } + // NOTE: we return wait=true not because the request is waiting, but + // because it should not continue scanning for conflicting locks. + return true, false } else { var e *list.Element for e = l.queuedWriters.Back(); e != nil; e = e.Prev() { @@ -1428,6 +1462,7 @@ func (l *lockState) tryActiveWait( } } g.mu.locks[l] = struct{}{} + waitForState.queuedWriters = l.queuedWriters.Len() } if replicatedLockFinalizedTxn != nil && l.queuedWriters.Front().Value.(*queuedGuard) == qg { // First waiter, so should not wait. NB: this inactive waiter can be @@ -1443,6 +1478,7 @@ func (l *lockState) tryActiveWait( } else { l.waitingReaders.PushFront(g) g.mu.locks[l] = struct{}{} + waitForState.queuedReaders = l.waitingReaders.Len() } } if !wait { @@ -1453,15 +1489,12 @@ func (l *lockState) tryActiveWait( // Make it an active waiter. g.key = l.key g.mu.startWait = true - waitForState.queuedWriters = l.queuedWriters.Len() - waitForState.queuedReaders = l.waitingReaders.Len() if g.isSameTxnAsReservation(waitForState) { state := waitForState state.kind = waitSelf g.mu.state = state } else { state := waitForState - state.guardAccess = sa if l.distinguishedWaiter == nil { l.distinguishedWaiter = g state.kind = waitForDistinguished @@ -2180,6 +2213,7 @@ func (t *lockTableImpl) newGuardForReq(req Request) *lockTableGuardImpl { g.txn = req.txnMeta() g.ts = req.Timestamp g.spans = req.LockSpans + g.maxWaitQueueLength = req.MaxLockWaitQueueLength g.sa = spanset.NumSpanAccess - 1 g.index = -1 return g diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 9fca4622bab0..7c1692583c77 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -54,7 +54,7 @@ new-txn txn= ts=[,] epoch= [seq=] Creates a TxnMeta. -new-request r= txn=|none ts=[,] spans=r|w@[,]+... +new-request r= txn=|none ts=[,] spans=r|w@[,]+... [max-lock-wait-queue-length=] ---- Creates a Request. @@ -245,11 +245,16 @@ func TestLockTableBasic(t *testing.T) { d.Fatalf(t, "unknown txn %s", txnName) } ts := scanTimestamp(t, d) + var maxLockWaitQueueLength int + if d.HasArg("max-lock-wait-queue-length") { + d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) + } spans := scanSpans(t, d, ts) req := Request{ - Timestamp: ts, - LatchSpans: spans, - LockSpans: spans, + Timestamp: ts, + MaxLockWaitQueueLength: maxLockWaitQueueLength, + LatchSpans: spans, + LockSpans: spans, } if txnMeta != nil { // Update the transaction's timestamp, if necessary. The transaction @@ -474,12 +479,16 @@ func TestLockTableBasic(t *testing.T) { typeStr = "waitElsewhere" case waitSelf: return str + "state=waitSelf" + case waitQueueMaxLengthExceeded: + typeStr = "waitQueueMaxLengthExceeded" case doneWaiting: var toResolveStr string if stateTransition { toResolveStr = intentsToResolveToStr(g.ResolveBeforeScanning(), true) } return str + "state=doneWaiting" + toResolveStr + default: + d.Fatalf(t, "unexpected state: %v", state.kind) } id := state.txn.ID var txnS string diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 03eca9534a6b..44182c14f31d 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -174,7 +174,7 @@ func (w *lockTableWaiterImpl) WaitOn( if state.held { err = w.pushLockTxn(ctx, req, state) } else { - err = newWriteIntentErr(state) + err = newWriteIntentErr(req, state) } if err != nil { return err @@ -284,6 +284,12 @@ func (w *lockTableWaiterImpl) WaitOn( // request's transaction is sending multiple requests concurrently. // Proceed with waiting without pushing anyone. + case waitQueueMaxLengthExceeded: + // The request attempted to wait in a lock wait-queue whose length was + // already equal to or exceeding the request's configured maximum. As a + // result, the request was rejected. + return newWriteIntentErr(req, state) + case doneWaiting: // The request has waited for all conflicting locks to be released // and is at the front of any lock wait-queues. It can now stop @@ -394,7 +400,7 @@ func (w *lockTableWaiterImpl) pushLockTxn( ctx context.Context, req Request, ws waitingState, ) *Error { if w.disableTxnPushing { - return newWriteIntentErr(ws) + return newWriteIntentErr(req, ws) } // Construct the request header and determine which form of push to use. @@ -436,7 +442,7 @@ func (w *lockTableWaiterImpl) pushLockTxn( // If pushing with an Error WaitPolicy and the push fails, then the lock // holder is still active. Transform the error into a WriteIntentError. if _, ok := err.GetDetail().(*roachpb.TransactionPushError); ok && req.WaitPolicy == lock.WaitPolicy_Error { - err = newWriteIntentErr(ws) + err = newWriteIntentErr(req, ws) } return err } @@ -786,7 +792,7 @@ func (h *contentionEventHelper) emitAndInit(s waitingState) { } h.tBegin = timeutil.Now() } - case waitElsewhere, doneWaiting: + case waitElsewhere, waitQueueMaxLengthExceeded, doneWaiting: // If we have an event, emit it now and that's it - the case we're in // does not give us a new transaction/key. if h.ev != nil { @@ -797,10 +803,22 @@ func (h *contentionEventHelper) emitAndInit(s waitingState) { } } -func newWriteIntentErr(ws waitingState) *Error { - return roachpb.NewError(&roachpb.WriteIntentError{ +func newWriteIntentErr(req Request, ws waitingState) *Error { + err := roachpb.NewError(&roachpb.WriteIntentError{ Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)}, }) + // TODO(nvanbenschoten): setting an error index can assist the KV client in + // understanding which request hit an error. This is not necessary, but can + // improve error handling, leading to better error messages and performance + // optimizations in some cases. We don't have an easy way to associate a given + // conflict with a specific request in a batch because we don't retain a + // mapping from lock span to request. However, as a best-effort optimization, + // we set the error index to 0 if this is the only request in the batch (that + // landed on this range, from the client's perspective). + if len(req.Requests) == 1 { + err.SetErrorIndex(0) + } + return err } func hasMinPriority(txn *enginepb.TxnMeta) bool { diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 1a407806cb21..aa5bf72e9774 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -176,6 +176,10 @@ func TestLockTableWaiterWithTxn(t *testing.T) { testWaitNoopUntilDone(t, waitSelf, makeReq) }) + t.Run("waitQueueMaxLengthExceeded", func(t *testing.T) { + testErrorWaitPush(t, waitQueueMaxLengthExceeded, makeReq, dontExpectPush) + }) + t.Run("doneWaiting", func(t *testing.T) { w, _, g := setupLockTableWaiterTest() defer w.stopper.Stop(ctx) @@ -248,6 +252,10 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) { t.Log("waitSelf is not possible for non-transactional request") }) + t.Run("waitQueueMaxLengthExceeded", func(t *testing.T) { + testErrorWaitPush(t, waitQueueMaxLengthExceeded, makeReq, dontExpectPush) + }) + t.Run("doneWaiting", func(t *testing.T) { w, _, g := setupLockTableWaiterTest() defer w.stopper.Stop(ctx) @@ -441,6 +449,10 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) { testWaitNoopUntilDone(t, waitSelf, makeReq) }) + t.Run("waitQueueMaxLengthExceeded", func(t *testing.T) { + testErrorWaitPush(t, waitQueueMaxLengthExceeded, makeReq, dontExpectPush) + }) + t.Run("doneWaiting", func(t *testing.T) { w, _, g := setupLockTableWaiterTest() defer w.stopper.Stop(ctx) @@ -454,6 +466,8 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) { }) } +var dontExpectPush = hlc.Timestamp{} + func testErrorWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hlc.Timestamp) { ctx := context.Background() keyA := roachpb.Key("keyA") @@ -472,9 +486,10 @@ func testErrorWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPush } g.notify() - // If the lock is not held, expect an error immediately. The one - // exception to this is waitElsewhere, which expects no error. - if !lockHeld { + // If the lock is not held or expPushTS is empty, expect an error + // immediately. The one exception to this is waitElsewhere, which + // expects no error. + if !lockHeld || expPushTS == dontExpectPush { err := w.WaitOn(ctx, req, g) if k == waitElsewhere { require.Nil(t, err) diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded new file mode 100644 index 000000000000..b96e91fa76aa --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded @@ -0,0 +1,223 @@ +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=11,1 epoch=0 +---- + +new-txn name=txn3 ts=12,1 epoch=0 +---- + +new-txn name=txn4 ts=13,1 epoch=0 +---- + +# NOTE: txn5.ts < txn2.ts so that txn5's reads don't conflict with +# txn2's writes. This helps make the test deterministic when txn1 +# releases its lock. +new-txn name=txn5 ts=10,1 epoch=0 +---- + +# ------------------------------------------------------------- +# Prep: Txn 1 acquire locks at key k +# Txns 2, 3, and 4 begin waiting in k's wait-queue +# ------------------------------------------------------------- + +new-request name=req1 txn=txn1 ts=10,0 + put key=k value=v +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +on-lock-acquired req=req1 key=k +---- +[-] acquire lock: txn 00000001 @ k + +finish req=req1 +---- +[-] finish req1: finishing request + +new-request name=req2 txn=txn2 ts=11,0 + put key=k value=v2 +---- + +sequence req=req2 +---- +[2] sequence req2: sequencing request +[2] sequence req2: acquiring latches +[2] sequence req2: scanning lock table for conflicting locks +[2] sequence req2: waiting in lock wait-queues +[2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence req2: pushing txn 00000001 to abort +[2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction + +new-request name=req3 txn=txn3 ts=12,0 + put key=k value=v3 +---- + +sequence req=req3 +---- +[3] sequence req3: sequencing request +[3] sequence req3: acquiring latches +[3] sequence req3: scanning lock table for conflicting locks +[3] sequence req3: waiting in lock wait-queues +[3] sequence req3: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[3] sequence req3: pushing txn 00000001 to abort +[3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction + +new-request name=req4 txn=txn4 ts=13,0 + put key=k value=v4 +---- + +sequence req=req4 +---- +[4] sequence req4: sequencing request +[4] sequence req4: acquiring latches +[4] sequence req4: scanning lock table for conflicting locks +[4] sequence req4: waiting in lock wait-queues +[4] sequence req4: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 3, queuedReaders: 0) +[4] sequence req4: pushing txn 00000001 to abort +[4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000002-0000-0000-0000-000000000000 + active: true req: 3, txn: 00000003-0000-0000-0000-000000000000 + active: true req: 4, txn: 00000004-0000-0000-0000-000000000000 + distinguished req: 2 +local: num=0 + +# ------------------------------------------------------------- +# Read-only request runs into long lock wait-queue. Waits for +# lock to be released, but not in queue. Proceeds as soon as +# lock is released. +# ------------------------------------------------------------- + +new-request name=req5r txn=txn5 ts=10,0 max-lock-wait-queue-length=2 + get key=k +---- + +sequence req=req5r +---- +[5] sequence req5r: sequencing request +[5] sequence req5r: acquiring latches +[5] sequence req5r: scanning lock table for conflicting locks +[5] sequence req5r: waiting in lock wait-queues +[5] sequence req5r: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 3, queuedReaders: 1) +[5] sequence req5r: pushing timestamp of txn 00000001 above 10.000000000,1 +[5] sequence req5r: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 +[2] sequence req2: resolving intent "k" for txn 00000001 with COMMITTED status +[2] sequence req2: lock wait-queue event: done waiting +[2] sequence req2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 1.234s +[2] sequence req2: acquiring latches +[2] sequence req2: scanning lock table for conflicting locks +[2] sequence req2: sequencing complete, returned guard +[3] sequence req3: resolving intent "k" for txn 00000001 with COMMITTED status +[3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 running request @ key "k" (queuedWriters: 2, queuedReaders: 0) +[3] sequence req3: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 1.234s +[3] sequence req3: pushing txn 00000002 to detect request deadlock +[3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction +[4] sequence req4: resolving intent "k" for txn 00000001 with COMMITTED status +[4] sequence req4: lock wait-queue event: wait for txn 00000002 running request @ key "k" (queuedWriters: 2, queuedReaders: 0) +[4] sequence req4: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 1.234s +[4] sequence req4: pushing txn 00000002 to detect request deadlock +[4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction +[5] sequence req5r: resolving intent "k" for txn 00000001 with COMMITTED status +[5] sequence req5r: lock wait-queue event: done waiting +[5] sequence req5r: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 1.234s +[5] sequence req5r: acquiring latches +[5] sequence req5r: scanning lock table for conflicting locks +[5] sequence req5r: sequencing complete, returned guard + +finish req=req5r +---- +[-] finish req5r: finishing request + +on-lock-acquired req=req2 key=k +---- +[-] acquire lock: txn 00000002 @ k +[3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[3] sequence req3: pushing txn 00000002 to abort +[3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction +[4] sequence req4: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[4] sequence req4: pushing txn 00000002 to abort +[4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction + +finish req=req2 +---- +[-] finish req2: finishing request + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 11.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 3, txn: 00000003-0000-0000-0000-000000000000 + active: true req: 4, txn: 00000004-0000-0000-0000-000000000000 + distinguished req: 3 +local: num=0 + +# ------------------------------------------------------------- +# Read-write request runs into long lock wait-queue. Instead of +# waiting in the queue, the request raises an error. +# ------------------------------------------------------------- + +new-request name=req5w txn=txn5 ts=10,0 max-lock-wait-queue-length=2 + put key=k value=v5 +---- + +sequence req=req5w +---- +[6] sequence req5w: sequencing request +[6] sequence req5w: acquiring latches +[6] sequence req5w: scanning lock table for conflicting locks +[6] sequence req5w: waiting in lock wait-queues +[6] sequence req5w: lock wait-queue event: wait-queue maximum length exceeded @ key "k" with length 2 +[6] sequence req5w: sequencing complete, returned error: conflicting intents on "k" + +# ------------------------------------------------------------- +# Cleanup. +# ------------------------------------------------------------- + +on-txn-updated txn=txn2 status=aborted +---- +[-] update txn: aborting txn2 +[3] sequence req3: resolving intent "k" for txn 00000002 with ABORTED status +[3] sequence req3: lock wait-queue event: done waiting +[3] sequence req3: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.234s +[3] sequence req3: acquiring latches +[3] sequence req3: scanning lock table for conflicting locks +[3] sequence req3: sequencing complete, returned guard +[4] sequence req4: resolving intent "k" for txn 00000002 with ABORTED status +[4] sequence req4: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req4: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.234s +[4] sequence req4: pushing txn 00000003 to detect request deadlock +[4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction + +finish req=req3 +---- +[-] finish req3: finishing request +[4] sequence req4: lock wait-queue event: done waiting +[4] sequence req4: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 1.234s +[4] sequence req4: acquiring latches +[4] sequence req4: scanning lock table for conflicting locks +[4] sequence req4: sequencing complete, returned guard + +finish req=req4 +---- +[-] finish req4: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded b/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded new file mode 100644 index 000000000000..8ffa6f876fb4 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded @@ -0,0 +1,186 @@ +new-lock-table maxlocks=10000 +---- + +new-txn txn=txn1 ts=10 epoch=0 +---- + +new-txn txn=txn2 ts=10 epoch=0 +---- + +new-txn txn=txn3 ts=10 epoch=0 +---- + +new-request r=req1 txn=txn1 ts=10 spans=w@a +---- + +new-request r=req2 txn=txn2 ts=10 spans=w@a +---- + +new-request r=req3 txn=txn3 ts=10 spans=w@a +---- + +scan r=req1 +---- +start-waiting: false + +acquire r=req1 k=a durability=u +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +scan r=req2 +---- +start-waiting: true + +scan r=req3 +---- +start-waiting: true + +print +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 2 +local: num=0 + +# --------------------------------------------------------------------------------- +# Read requests do not observe a queue length limit, because they don't wait in the +# queuedWriters list. +# --------------------------------------------------------------------------------- + +new-txn txn=txn4 ts=10 epoch=0 +---- + +new-request r=req4 txn=txn4 ts=10 spans=r@a max-lock-wait-queue-length=2 +---- + +scan r=req4 +---- +start-waiting: true + +guard-state r=req4 +---- +new: state=waitFor txn=txn1 key="a" held=true guard-access=read + +dequeue r=req4 +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 2 +local: num=0 + +# --------------------------------------------------------------------------------- +# Write requests with a large enough MaxLockWaitQueueLength do not throw an error. +# --------------------------------------------------------------------------------- + +new-txn txn=txn5 ts=10 epoch=0 +---- + +new-request r=req5 txn=txn5 ts=10 spans=w@a max-lock-wait-queue-length=3 +---- + +scan r=req5 +---- +start-waiting: true + +guard-state r=req5 +---- +new: state=waitFor txn=txn1 key="a" held=true guard-access=write + +dequeue r=req5 +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 2 +local: num=0 + +# --------------------------------------------------------------------------------- +# Write requests with a sufficiently low MaxLockWaitQueueLength throw an error. +# --------------------------------------------------------------------------------- + +new-txn txn=txn6 ts=10 epoch=0 +---- + +new-request r=req6 txn=txn6 ts=10 spans=w@a max-lock-wait-queue-length=2 +---- + +scan r=req6 +---- +start-waiting: true + +guard-state r=req6 +---- +new: state=waitQueueMaxLengthExceeded txn=txn1 key="a" held=true guard-access=write + +dequeue r=req6 +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 2 +local: num=0 + +# --------------------------------------------------------------------------------- +# Same as previous two cases, but for non-transactional writes. +# --------------------------------------------------------------------------------- + +new-request r=req7 txn=none ts=10 spans=w@a max-lock-wait-queue-length=3 +---- + +scan r=req7 +---- +start-waiting: true + +guard-state r=req7 +---- +new: state=waitFor txn=txn1 key="a" held=true guard-access=write + +dequeue r=req7 +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 2 +local: num=0 + +new-request r=req8 txn=none ts=10 spans=w@a max-lock-wait-queue-length=2 +---- + +scan r=req8 +---- +start-waiting: true + +guard-state r=req8 +---- +new: state=waitQueueMaxLengthExceeded txn=txn1 key="a" held=true guard-access=write + +dequeue r=req8 +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,0, info: unrepl epoch: 0, seqs: [0] + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 2 +local: num=0