Skip to content

Commit

Permalink
concurrency: emit structured contention information to trace
Browse files Browse the repository at this point in the history
This change attaches a protobuf payload to the current Span whenever a
request conflicts with another transaction. The payload contains the
contending txn (i.e. the pushee) at the time at which it was first
encountered, the key on which the conflict occurred (note that this is
not necessarily the key at which the pushee is anchored) and the time
spent waiting on the conflict (excluding intent resolution).

This enables cockroachdb#57114.

Touches cockroachdb#55583. I am not closing that issue yet because we also want
to potentially track the outcome of the conflict.

Release note: None
  • Loading branch information
tbg committed Jan 12, 2021
1 parent 3f5eebd commit 5265ba8
Show file tree
Hide file tree
Showing 22 changed files with 906 additions and 683 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
// Configs + Knobs.
MaxLockTableSize int64
DisableTxnPushing bool
OnContentionEvent func(*roachpb.ContentionEvent) // may be nil; allowed to mutate the event
TxnWaitKnobs txnwait.TestingKnobs
}

Expand Down Expand Up @@ -92,6 +93,7 @@ func NewManager(cfg Config) Manager {
ir: cfg.IntentResolver,
lt: lt,
disableTxnPushing: cfg.DisableTxnPushing,
onContentionEvent: cfg.OnContentionEvent,
},
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
// pkg/storage/concurrency/txnwait package.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ func (c *cluster) makeConfig() concurrency.Config {
RangeDesc: c.rangeDesc,
Settings: c.st,
IntentResolver: c,
OnContentionEvent: func(ev *roachpb.ContentionEvent) {
ev.Duration = 1234 * time.Millisecond // for determinism
},
TxnWaitMetrics: txnwait.NewMetrics(time.Minute),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type waitingState struct {
// Represents who the request is waiting for. The conflicting
// transaction may be a lock holder of a conflicting lock or a
// conflicting request being sequenced through the same lockTable.
txn *enginepb.TxnMeta // always non-nil
txn *enginepb.TxnMeta // always non-nil in waitFor{,Distinguished}
key roachpb.Key // the key of the conflict
held bool // is the conflict a held lock?

Expand Down
80 changes: 80 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package concurrency

import (
"bytes"
"context"
"math"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -99,6 +101,9 @@ type lockTableWaiterImpl struct {
// When set, WriteIntentError are propagated instead of pushing
// conflicting transactions.
disableTxnPushing bool
// When set, called just before each ContentionEvent is emitted.
// Is allowed to mutate the event.
onContentionEvent func(ev *roachpb.ContentionEvent)
}

// IntentResolver is an interface used by lockTableWaiterImpl to push
Expand Down Expand Up @@ -131,11 +136,24 @@ func (w *lockTableWaiterImpl) WaitOn(
var timer *timeutil.Timer
var timerC <-chan time.Time
var timerWaitingState waitingState

ice := contentionEventHelper{
sp: tracing.SpanFromContext(ctx),
onEvent: w.onContentionEvent,
}
defer ice.emit()

for {
select {
// newStateC will be signaled for the transaction we are currently
// contending on. We will continue to receive updates about this
// transaction until it no longer contends with us, at which point
// either one of the other channels fires or we receive state
// about another contending transaction on newStateC.
case <-newStateC:
timerC = nil
state := guard.CurState()
ice.emitAndInit(state)
switch state.kind {
case waitFor, waitForDistinguished:
if req.WaitPolicy == lock.WaitPolicy_Error {
Expand Down Expand Up @@ -648,6 +666,68 @@ func (c *txnCache) insertFrontLocked(txn *roachpb.Transaction) {
c.txns[0] = txn
}

// contentionEventHelper tracks and emits ContentionEvents.
type contentionEventHelper struct {
sp *tracing.Span
ev *roachpb.ContentionEvent
tBegin time.Time
onEvent func(event *roachpb.ContentionEvent) // may be nil
}

// emit emits the open contention event, if any.
func (h *contentionEventHelper) emit() {
if h.ev == nil {
return
}
h.ev.Duration = timeutil.Since(h.tBegin)
if h.onEvent != nil {
// NB: this is intentionally above the call to LogStructured so that
// this interceptor gets to mutate the event (used for test determinism).
h.onEvent(h.ev)
}
h.sp.LogStructured(h.ev)
h.ev = nil
}

// emitAndInit compares the waitingState's active txn (if any) against the open
// ContentionEvent (if any). If the they match, we are continuing to handle the
// same event and no action is taken. If they differ, the open event (if any) is
// finalized and added to the Span, and a new event initialized from the inputs.
func (h *contentionEventHelper) emitAndInit(s waitingState) {
if h.sp == nil {
// No span to attach payloads to - don't do any work.
//
// TODO(tbg): we could special case the noop span here too, but the plan is for
// nobody to use noop spans any more (trace.mode=background).
return
}

var finalize bool
switch s.kind {
case waitFor, waitForDistinguished:
finalize = h.ev != nil &&
(!h.ev.TxnMeta.ID.Equal(s.txn.ID) || !bytes.Equal(h.ev.Key, s.key))
case waitSelf, waitElsewhere, doneWaiting:
// TODO(tbg): fix waitSelf to come with an attached txn (ours) and
// handle it like waitFor above.
finalize = true
default:
panic("unhandled waitingState.kind")
}

if finalize {
h.emit()
}
if s.txn == nil {
return
}
h.ev = &roachpb.ContentionEvent{
Key: s.key,
TxnMeta: *s.txn,
}
h.tBegin = timeutil.Now()
}

func newWriteIntentErr(ws waitingState) *Error {
return roachpb.NewError(&roachpb.WriteIntentError{
Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)},
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ func testWaitNoopUntilDone(t *testing.T, k waitKind, makeReq func() Request) {
w, _, g := setupLockTableWaiterTest()
defer w.stopper.Stop(ctx)

g.state = waitingState{kind: k}
txn := makeTxnProto("noop-wait-txn")
g.state = waitingState{
kind: k,
txn: &txn.TxnMeta,
}
g.notify()
defer notifyUntilDone(t, g)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[2] sequence req3: resolving intent "k" for txn 00000002 with COMMITTED status
[2] sequence req3: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: sequencing complete, returned guard
Expand Down Expand Up @@ -215,9 +216,11 @@ on-txn-updated txn=txn2 status=pending ts=18,1
----
[-] update txn: increasing timestamp of txn2
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status
[2] sequence req5: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: sequencing complete, returned guard
[3] sequence req6: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: sequencing complete, returned guard
Expand Down Expand Up @@ -248,6 +251,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req7: resolving intent "k" for txn 00000002 with COMMITTED status
[4] sequence req7: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 1.23s
[4] sequence req7: acquiring latches
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: sequencing complete, returned guard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ on-txn-updated txn=txn2 status=aborted
----
[-] update txn: aborting txn2
[3] sequence req1: resolving intent "a" for txn 00000002 with ABORTED status
[3] sequence req1: conflicted with 00000002-0000-0000-0000-000000000000 on "a" for 1.23s
[3] sequence req1: resolving a batch of 9 intent(s)
[3] sequence req1: resolving intent "b" for txn 00000002 with ABORTED status
[3] sequence req1: resolving intent "c" for txn 00000002 with ABORTED status
Expand Down Expand Up @@ -178,6 +179,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[3] sequence req1: resolving intent "a" for txn 00000002 with COMMITTED status
[3] sequence req1: conflicted with 00000002-0000-0000-0000-000000000000 on "a" for 1.23s
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
Expand Down Expand Up @@ -362,6 +364,7 @@ on-txn-updated txn=txn2 status=aborted
----
[-] update txn: aborting txn2
[4] sequence req1: resolving intent "a" for txn 00000002 with ABORTED status
[4] sequence req1: conflicted with 00000002-0000-0000-0000-000000000000 on "a" for 1.23s
[4] sequence req1: resolving a batch of 1 intent(s)
[4] sequence req1: resolving intent "b" for txn 00000002 with ABORTED status
[4] sequence req1: acquiring latches
Expand Down Expand Up @@ -576,9 +579,11 @@ on-txn-updated txn=txn3 status=aborted
----
[-] update txn: aborting txn3
[3] sequence req1: resolving intent "c" for txn 00000003 with ABORTED status
[3] sequence req1: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[3] sequence req1: pushing txn 00000005 to abort
[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction
[6] sequence req2: resolving intent "a" for txn 00000003 with ABORTED status
[6] sequence req2: conflicted with 00000003-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req2: pushing timestamp of txn 00000004 above 11.000000000,1
[6] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -609,6 +614,7 @@ on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4
[6] sequence req2: resolving intent "b" for txn 00000004 with ABORTED status
[6] sequence req2: conflicted with 00000004-0000-0000-0000-000000000000 on "b" for 1.23s
[6] sequence req2: resolving a batch of 1 intent(s)
[6] sequence req2: resolving intent "d" for txn 00000003 with ABORTED status
[6] sequence req2: acquiring latches
Expand Down Expand Up @@ -639,6 +645,7 @@ on-txn-updated txn=txn5 status=aborted
----
[-] update txn: aborting txn5
[3] sequence req1: resolving intent "e" for txn 00000005 with ABORTED status
[3] sequence req1: conflicted with 00000005-0000-0000-0000-000000000000 on "e" for 1.23s
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[4] sequence req1r: detected pusher aborted
[4] sequence req1r: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req1r: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[6] sequence req3r: resolving intent "a" for txn 00000001 with ABORTED status
[6] sequence req3r: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3r: acquiring latches
[6] sequence req3r: scanning lock table for conflicting locks
[6] sequence req3r: sequencing complete, returned guard
Expand All @@ -176,6 +178,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[5] sequence req2r: resolving intent "c" for txn 00000003 with COMMITTED status
[5] sequence req2r: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[5] sequence req2r: acquiring latches
[5] sequence req2r: scanning lock table for conflicting locks
[5] sequence req2r: sequencing complete, returned guard
Expand Down Expand Up @@ -376,19 +379,23 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[4] sequence req4w: resolving intent "a" for txn 00000001 with ABORTED status
[4] sequence req4w: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[4] sequence req4w: acquiring latches
[4] sequence req4w: scanning lock table for conflicting locks
[4] sequence req4w: sequencing complete, returned guard
[5] sequence req1w2: detected pusher aborted
[5] sequence req1w2: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req1w2: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[7] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status
[7] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[7] sequence req3w2: pushing txn 00000004 to detect request deadlock
[7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction

# Txn4 can proceed.
finish req=req4w
----
[-] finish req4w: finishing request
[7] sequence req3w2: conflicted with 00000004-0000-0000-0000-000000000000 on "a" for 1.23s
[7] sequence req3w2: acquiring latches
[7] sequence req3w2: scanning lock table for conflicting locks
[7] sequence req3w2: sequencing complete, returned guard
Expand All @@ -402,6 +409,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[6] sequence req2w2: resolving intent "c" for txn 00000003 with COMMITTED status
[6] sequence req2w2: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[6] sequence req2w2: acquiring latches
[6] sequence req2w2: scanning lock table for conflicting locks
[6] sequence req2w2: sequencing complete, returned guard
Expand Down Expand Up @@ -537,6 +545,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status
[4] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req4w: pushing txn 00000003 to abort
[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -612,7 +621,9 @@ on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4
[4] sequence req4w: detected pusher aborted
[4] sequence req4w: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[4] sequence req4w: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[5] sequence req1w2: conflicted with 00000004-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req1w2: acquiring latches
[5] sequence req1w2: scanning lock table for conflicting locks
[5] sequence req1w2: sequencing complete, returned guard
Expand All @@ -626,6 +637,7 @@ on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1
[6] sequence req3w2: resolving intent "a" for txn 00000001 with COMMITTED status
[6] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3w2: acquiring latches
[6] sequence req3w2: scanning lock table for conflicting locks
[6] sequence req3w2: sequencing complete, returned guard
Expand Down Expand Up @@ -761,6 +773,7 @@ on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status
[4] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req4w: pushing txn 00000003 to abort
[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -836,8 +849,10 @@ on-txn-updated txn=txn1 status=aborted
----
[-] update txn: aborting txn1
[5] sequence req1w2: detected pusher aborted
[5] sequence req1w2: conflicted with 00000004-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req1w2: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[6] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status
[6] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3w2: acquiring latches
[6] sequence req3w2: scanning lock table for conflicting locks
[6] sequence req3w2: sequencing complete, returned guard
Expand All @@ -851,6 +866,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[4] sequence req4w: resolving intent "c" for txn 00000003 with COMMITTED status
[4] sequence req4w: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[4] sequence req4w: acquiring latches
[4] sequence req4w: scanning lock table for conflicting locks
[4] sequence req4w: sequencing complete, returned guard
Expand Down Expand Up @@ -1006,16 +1022,19 @@ on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1
[5] sequence req4w: resolving intent "a" for txn 00000001 with COMMITTED status
[5] sequence req4w: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 1.23s
[5] sequence req4w: pushing txn 00000002 to abort
[5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req5w: resolving intent "b" for txn 00000002 with COMMITTED status
[4] sequence req5w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[4] sequence req5w: pushing txn 00000003 to abort
[4] sequence req5w: blocked on select in concurrency_test.(*cluster).PushTransaction
[5] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status
[5] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req4w: pushing txn 00000005 to detect request deadlock
[5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction

Expand Down Expand Up @@ -1081,7 +1100,9 @@ on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4
[5] sequence req4w: detected pusher aborted
[5] sequence req4w: conflicted with 00000005-0000-0000-0000-000000000000 on "b" for 1.23s
[5] sequence req4w: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): <nil>
[6] sequence req3w2: conflicted with 00000004-0000-0000-0000-000000000000 on "a" for 1.23s
[6] sequence req3w2: acquiring latches
[6] sequence req3w2: scanning lock table for conflicting locks
[6] sequence req3w2: sequencing complete, returned guard
Expand All @@ -1095,6 +1116,7 @@ on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[4] sequence req5w: resolving intent "c" for txn 00000003 with COMMITTED status
[4] sequence req5w: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 1.23s
[4] sequence req5w: acquiring latches
[4] sequence req5w: scanning lock table for conflicting locks
[4] sequence req5w: sequencing complete, returned guard
Expand Down
Loading

0 comments on commit 5265ba8

Please sign in to comment.