Skip to content

Commit

Permalink
concurrency: emit structured contention information to trace
Browse files Browse the repository at this point in the history
This change attaches a protobuf payload to the current Span whenever a
request conflicts with another transaction. The payload contains the
contending txn (i.e. the pushee) at the time at which it was first
encountered, the key on which the conflict occurred (note that this is
not necessarily the key at which the pushee is anchored) and the time
spent waiting on the conflict (excluding intent resolution).

This enables cockroachdb#57114.

Touches cockroachdb#55583. I am not closing that issue yet because we also want
to potentially track the outcome of the conflict.

Release note: None
  • Loading branch information
tbg committed Jan 7, 2021
1 parent 72cc245 commit d063a05
Show file tree
Hide file tree
Showing 21 changed files with 885 additions and 678 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
// Configs + Knobs.
MaxLockTableSize int64
DisableTxnPushing bool
OnContentionEvent func(*roachpb.ContentionEvent) // may be nil; allowed to mutate the event
TxnWaitKnobs txnwait.TestingKnobs
}

Expand Down Expand Up @@ -92,6 +93,7 @@ func NewManager(cfg Config) Manager {
ir: cfg.IntentResolver,
lt: lt,
disableTxnPushing: cfg.DisableTxnPushing,
onContentionEvent: cfg.OnContentionEvent,
},
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
// pkg/storage/concurrency/txnwait package.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ func (c *cluster) makeConfig() concurrency.Config {
RangeDesc: c.rangeDesc,
Settings: c.st,
IntentResolver: c,
OnContentionEvent: func(ev *roachpb.ContentionEvent) {
ev.Duration = 1234 * time.Millisecond // for determinism
},
TxnWaitMetrics: txnwait.NewMetrics(time.Minute),
}
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package concurrency

import (
"bytes"
"context"
"math"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -99,6 +101,9 @@ type lockTableWaiterImpl struct {
// When set, WriteIntentError are propagated instead of pushing
// conflicting transactions.
disableTxnPushing bool
// When set, called just before each ContentionEvent is emitted.
// Is allowed to mutate the event.
onContentionEvent func(ev *roachpb.ContentionEvent)
}

// IntentResolver is an interface used by lockTableWaiterImpl to push
Expand All @@ -120,6 +125,52 @@ type IntentResolver interface {
ResolveIntents(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
}

// contentionEventHelper tracks and emits ContentionEvents.
type contentionEventHelper struct {
sp *tracing.Span
ev *roachpb.ContentionEvent
tBegin time.Time
onEvent func(event *roachpb.ContentionEvent) // may be nil
}

// emitAndInit compares its arguments against the open ContentionEvent. If they
// match, we are continuing to handle the same event and no action is taken.
// If they differ, the open event (if any) is finalized and added to the Span,
// and a new event initialized from the inputs.
//
// As a special case, the zero values for the arguments indicate
// that there is no next payload; this saves an allocation.
func (ice *contentionEventHelper) emitAndInit(newKey []byte, newTxn enginepb.TxnMeta) {
if ice.sp == nil {
// No span to attach payloads to - don't do any work.
//
// TODO(tbg): we could special case the noop span here too, but the plan is for
// nobody to use noop spans any more (trace.mode=background).
return
}
if ice.ev != nil {
if ice.ev.TxnMeta.ID.Equal(newTxn.ID) && bytes.Equal(ice.ev.Key, newKey) {
return
}
ice.ev.Duration = timeutil.Since(ice.tBegin)
if ice.onEvent != nil {
// NB: this is intentionally above the call to LogStructured so that
// this interceptor gets to mutate the event (used for test determinism).
ice.onEvent(ice.ev)
}
ice.sp.LogStructured(ice.ev)
}
ice.ev = nil
if newKey == nil && newTxn.ID == uuid.Nil {
return
}
ice.ev = &roachpb.ContentionEvent{
Key: newKey,
TxnMeta: newTxn,
}
ice.tBegin = timeutil.Now()
}

// WaitOn implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) WaitOn(
ctx context.Context, req Request, guard lockTableGuard,
Expand All @@ -131,13 +182,26 @@ func (w *lockTableWaiterImpl) WaitOn(
var timer *timeutil.Timer
var timerC <-chan time.Time
var timerWaitingState waitingState

ice := contentionEventHelper{
sp: tracing.SpanFromContext(ctx),
onEvent: w.onContentionEvent,
}
defer ice.emitAndInit(nil, enginepb.TxnMeta{}) // emit last open payload, if any

for {
select {
// newStateC will be signaled for the transaction we are currently
// contending on. We will continue to receive updates about this
// transaction until it no longer contends with us, at which point
// either one of the other channels fires or we receive state
// about another contending transaction on newStateC.
case <-newStateC:
timerC = nil
state := guard.CurState()
switch state.kind {
case waitFor, waitForDistinguished:
ice.emitAndInit(state.key, *state.txn)
if req.WaitPolicy == lock.WaitPolicy_Error {
// If the waiter has an Error wait policy, resolve the conflict
// immediately without waiting. If the conflict is a lock then
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ func testWaitNoopUntilDone(t *testing.T, k waitKind, makeReq func() Request) {
w, _, g := setupLockTableWaiterTest()
defer w.stopper.Stop(ctx)

g.state = waitingState{kind: k}
txn := makeTxnProto("noop-wait-txn")
g.state = waitingState{
kind: k,
txn: &txn.TxnMeta,
}
g.notify()
defer notifyUntilDone(t, g)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[2] sequence req3: resolving intent "k" for txn 00000002 with COMMITTED status
[2] sequence req3: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: sequencing complete, returned guard
Expand Down Expand Up @@ -215,9 +216,11 @@ on-txn-updated txn=txn2 status=pending ts=18,1
----
[-] update txn: increasing timestamp of txn2
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status
[2] sequence req5: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: sequencing complete, returned guard
[3] sequence req6: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: sequencing complete, returned guard
Expand Down Expand Up @@ -248,6 +251,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req7: resolving intent "k" for txn 00000002 with COMMITTED status
[4] sequence req7: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[4] sequence req7: acquiring latches
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: sequencing complete, returned guard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ on-txn-updated txn=txn2 status=aborted
[3] sequence req1: resolving intent "h" for txn 00000002 with ABORTED status
[3] sequence req1: resolving intent "i" for txn 00000002 with ABORTED status
[3] sequence req1: resolving intent "j" for txn 00000002 with ABORTED status
[3] sequence req1: conflicted with 00000002-0000-0000-0000-000000000000 on "a" for 1.23s
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
Expand Down Expand Up @@ -178,6 +179,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[3] sequence req1: resolving intent "a" for txn 00000002 with COMMITTED status
[3] sequence req1: conflicted with 00000002-0000-0000-0000-000000000000 on "a" for 1.23s
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
Expand Down Expand Up @@ -364,6 +366,7 @@ on-txn-updated txn=txn2 status=aborted
[4] sequence req1: resolving intent "a" for txn 00000002 with ABORTED status
[4] sequence req1: resolving a batch of 1 intent(s)
[4] sequence req1: resolving intent "b" for txn 00000002 with ABORTED status
[4] sequence req1: conflicted with 00000002-0000-0000-0000-000000000000 on "a" for 1.23s
[4] sequence req1: acquiring latches
[4] sequence req1: scanning lock table for conflicting locks
[4] sequence req1: sequencing complete, returned guard
Expand Down Expand Up @@ -576,9 +579,11 @@ on-txn-updated txn=txn3 status=aborted
----
[-] update txn: aborting txn3
[3] sequence req1: resolving intent "c" for txn 00000003 with ABORTED status
[3] sequence req1: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[3] sequence req1: pushing txn 00000005 to abort
[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction
[6] sequence req2: resolving intent "a" for txn 00000003 with ABORTED status
[6] sequence req2: conflicted with 00000003-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req2: pushing timestamp of txn 00000004 above 11.000000000,1
[6] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -611,6 +616,7 @@ on-txn-updated txn=txn4 status=aborted
[6] sequence req2: resolving intent "b" for txn 00000004 with ABORTED status
[6] sequence req2: resolving a batch of 1 intent(s)
[6] sequence req2: resolving intent "d" for txn 00000003 with ABORTED status
[6] sequence req2: conflicted with 00000004-0000-0000-0000-000000000000 on "b" for 1.23s
[6] sequence req2: acquiring latches
[6] sequence req2: scanning lock table for conflicting locks
[6] sequence req2: sequencing complete, returned guard
Expand Down Expand Up @@ -639,6 +645,7 @@ on-txn-updated txn=txn5 status=aborted
----
[-] update txn: aborting txn5
[3] sequence req1: resolving intent "e" for txn 00000005 with ABORTED status
[3] sequence req1: conflicted with 00000005-0000-0000-0000-000000000000 on "e" for 1.23s
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[4] sequence req1r: detected pusher aborted
[4] sequence req1r: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req1r: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[6] sequence req3r: resolving intent "a" for txn 00000001 with ABORTED status
[6] sequence req3r: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3r: acquiring latches
[6] sequence req3r: scanning lock table for conflicting locks
[6] sequence req3r: sequencing complete, returned guard
Expand All @@ -176,6 +178,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[5] sequence req2r: resolving intent "c" for txn 00000003 with COMMITTED status
[5] sequence req2r: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[5] sequence req2r: acquiring latches
[5] sequence req2r: scanning lock table for conflicting locks
[5] sequence req2r: sequencing complete, returned guard
Expand Down Expand Up @@ -376,19 +379,23 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[4] sequence req4w: resolving intent "a" for txn 00000001 with ABORTED status
[4] sequence req4w: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[4] sequence req4w: acquiring latches
[4] sequence req4w: scanning lock table for conflicting locks
[4] sequence req4w: sequencing complete, returned guard
[5] sequence req1w2: detected pusher aborted
[5] sequence req1w2: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req1w2: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[7] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status
[7] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[7] sequence req3w2: pushing txn 00000004 to detect request deadlock
[7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction

# Txn4 can proceed.
finish req=req4w
----
[-] finish req4w: finishing request
[7] sequence req3w2: conflicted with 00000004-0000-0000-0000-000000000000 on "a" for 1.23s
[7] sequence req3w2: acquiring latches
[7] sequence req3w2: scanning lock table for conflicting locks
[7] sequence req3w2: sequencing complete, returned guard
Expand All @@ -402,6 +409,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[6] sequence req2w2: resolving intent "c" for txn 00000003 with COMMITTED status
[6] sequence req2w2: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[6] sequence req2w2: acquiring latches
[6] sequence req2w2: scanning lock table for conflicting locks
[6] sequence req2w2: sequencing complete, returned guard
Expand Down Expand Up @@ -537,6 +545,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status
[4] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req4w: pushing txn 00000003 to abort
[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -612,7 +621,9 @@ on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4
[4] sequence req4w: detected pusher aborted
[4] sequence req4w: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[4] sequence req4w: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[5] sequence req1w2: conflicted with 00000004-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req1w2: acquiring latches
[5] sequence req1w2: scanning lock table for conflicting locks
[5] sequence req1w2: sequencing complete, returned guard
Expand All @@ -626,6 +637,7 @@ on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1
[6] sequence req3w2: resolving intent "a" for txn 00000001 with COMMITTED status
[6] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3w2: acquiring latches
[6] sequence req3w2: scanning lock table for conflicting locks
[6] sequence req3w2: sequencing complete, returned guard
Expand Down Expand Up @@ -761,6 +773,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status
[4] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req4w: pushing txn 00000003 to abort
[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -836,8 +849,10 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[5] sequence req1w2: detected pusher aborted
[5] sequence req1w2: conflicted with 00000004-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req1w2: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[6] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status
[6] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3w2: acquiring latches
[6] sequence req3w2: scanning lock table for conflicting locks
[6] sequence req3w2: sequencing complete, returned guard
Expand All @@ -851,6 +866,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[4] sequence req4w: resolving intent "c" for txn 00000003 with COMMITTED status
[4] sequence req4w: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[4] sequence req4w: acquiring latches
[4] sequence req4w: scanning lock table for conflicting locks
[4] sequence req4w: sequencing complete, returned guard
Expand Down Expand Up @@ -1006,16 +1022,19 @@ on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1
[5] sequence req4w: resolving intent "a" for txn 00000001 with COMMITTED status
[5] sequence req4w: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[5] sequence req4w: pushing txn 00000002 to abort
[5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req5w: resolving intent "b" for txn 00000002 with COMMITTED status
[4] sequence req5w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req5w: pushing txn 00000003 to abort
[4] sequence req5w: blocked on select in concurrency_test.(*cluster).PushTransaction
[5] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status
[5] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req4w: pushing txn 00000005 to detect request deadlock
[5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -1081,7 +1100,9 @@ on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4
[5] sequence req4w: detected pusher aborted
[5] sequence req4w: conflicted with 00000005-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req4w: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[6] sequence req3w2: conflicted with 00000004-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3w2: acquiring latches
[6] sequence req3w2: scanning lock table for conflicting locks
[6] sequence req3w2: sequencing complete, returned guard
Expand All @@ -1095,6 +1116,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[4] sequence req5w: resolving intent "c" for txn 00000003 with COMMITTED status
[4] sequence req5w: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[4] sequence req5w: acquiring latches
[4] sequence req5w: scanning lock table for conflicting locks
[4] sequence req5w: sequencing complete, returned guard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[5] sequence req4: resolving intent "k" for txn 00000003 with COMMITTED status
[5] sequence req4: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 1.23s
[5] sequence req4: acquiring latches
[5] sequence req4: scanning lock table for conflicting locks
[5] sequence req4: sequencing complete, returned guard
[7] sequence req2: resolving intent "k" for txn 00000003 with COMMITTED status
[7] sequence req2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 1.23s
[7] sequence req2: acquiring latches
[7] sequence req2: scanning lock table for conflicting locks
[7] sequence req2: sequencing complete, returned guard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[3] sequence req1: resolving intent "k" for txn 00000001 with ABORTED status
[3] sequence req1: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 1.23s
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
Expand Down
Loading

0 comments on commit d063a05

Please sign in to comment.