Skip to content

Commit

Permalink
concurrency: emit structured contention information to trace
Browse files Browse the repository at this point in the history
This change attaches a protobuf payload to the current Span whenever a
request conflicts with another transaction. The payload contains the
contending txn (i.e. the pushee) at the time at which it was first
encountered, the key on which the conflict occurred (note that this is
not necessarily the key at which the pushee is anchored) and the time
spent waiting on the conflict (excluding intent resolution).

This enables cockroachdb#57114.

Touches cockroachdb#55583. I am not closing that issue yet because we also want
to potentially track the outcome of the conflict.

Release note: None
  • Loading branch information
tbg committed Jan 6, 2021
1 parent 9fa3e82 commit d1daac5
Show file tree
Hide file tree
Showing 14 changed files with 812 additions and 662 deletions.
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
"github.com/maruel/panicparse/stack"
"github.com/petermattis/goid"
)
Expand Down Expand Up @@ -920,6 +921,18 @@ func (m *monitor) collectRecordings() string {
g.prevEvents++
}
}
var ev roachpb.ContentionEvent
for _, item := range span.InternalStructured {
if types.Is(item, &ev) {
_ = types.UnmarshalAny(item, &ev)
if ev.Duration != 0 {
ev.Duration = 123 // for determinism
}
logs = append(logs, logRecord{
g: g, value: fmt.Sprintf("contention metadata: %s @ %s", ev.TxnMeta.ID.Short(), ev.Key),
})
}
}
}
if atomic.LoadInt32(&g.finished) == 1 {
g.cancel()
Expand Down
53 changes: 53 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package concurrency

import (
"bytes"
"context"
"math"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -120,6 +122,47 @@ type IntentResolver interface {
ResolveIntents(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
}

// contentionEventHelper helps with how
// contention events are tracked and emitted.
type contentionEventHelper struct {
sp *tracing.Span
ev *roachpb.ContentionEvent
tBegin time.Time
}

// emitAndInit compares its arguments against the open ContentionEvent. If they
// match, we are continuing to handle the same event and no action is taken.
// If they differ, the open event (if any) is finalized and added to the Span,
// and a new event initialized from the inputs.
//
// As a special case, the zero values for the arguments indicate
// that there is no next payload; this saves an allocation.
func (ice *contentionEventHelper) emitAndInit(newKey []byte, newTxn enginepb.TxnMeta) {
if ice.sp == nil {
// No span to attach payloads to - don't do any work.
//
// TODO(tbg): we could special case the noop span here too, but the plan is for
// nobody to use noop spans any more (trace.mode=background).
return
}
if ice.ev != nil {
if ice.ev.TxnMeta.ID.Equal(newTxn.ID) && bytes.Equal(ice.ev.Key, newKey) {
return
}
ice.ev.Duration = timeutil.Since(ice.tBegin)
ice.sp.LogStructured(ice.ev)
}
ice.ev = nil
if newKey == nil && newTxn.ID == uuid.Nil {
return
}
ice.ev = &roachpb.ContentionEvent{
Key: newKey,
TxnMeta: newTxn,
}
ice.tBegin = timeutil.Now()
}

// WaitOn implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) WaitOn(
ctx context.Context, req Request, guard lockTableGuard,
Expand All @@ -131,13 +174,23 @@ func (w *lockTableWaiterImpl) WaitOn(
var timer *timeutil.Timer
var timerC <-chan time.Time
var timerWaitingState waitingState

ice := contentionEventHelper{sp: tracing.SpanFromContext(ctx)}
defer ice.emitAndInit(nil, enginepb.TxnMeta{}) // emit last open payload, if any

for {
select {
// newStateC will be signaled for the transaction we are currently
// contending on. We will continue to receive updates about this
// transaction until it no longer contends with us, at which point
// either one of the other channels fires or we receive state
// about another contending transaction on newStateC.
case <-newStateC:
timerC = nil
state := guard.CurState()
switch state.kind {
case waitFor, waitForDistinguished:
ice.emitAndInit(state.key, *state.txn)
if req.WaitPolicy == lock.WaitPolicy_Error {
// If the waiter has an Error wait policy, resolve the conflict
// immediately without waiting. If the conflict is a lock then
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ func testWaitNoopUntilDone(t *testing.T, k waitKind, makeReq func() Request) {
w, _, g := setupLockTableWaiterTest()
defer w.stopper.Stop(ctx)

g.state = waitingState{kind: k}
txn := makeTxnProto("noop-wait-txn")
g.state = waitingState{
kind: k,
txn: &txn.TxnMeta,
}
g.notify()
defer notifyUntilDone(t, g)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ on-txn-updated txn=txn2 status=committed
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: sequencing complete, returned guard
[2] sequence req3: contention metadata: 00000002 @ "k"

debug-lock-table
----
Expand Down Expand Up @@ -218,9 +219,11 @@ on-txn-updated txn=txn2 status=pending ts=18,1
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: sequencing complete, returned guard
[2] sequence req5: contention metadata: 00000002 @ "k"
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: sequencing complete, returned guard
[3] sequence req6: contention metadata: 00000002 @ "k"

new-request name=req7 txn=none ts=12,1
put key=k value=v
Expand Down Expand Up @@ -251,6 +254,7 @@ on-txn-updated txn=txn2 status=committed
[4] sequence req7: acquiring latches
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: sequencing complete, returned guard
[4] sequence req7: contention metadata: 00000002 @ "k"

finish req=req7
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ on-txn-updated txn=txn2 status=aborted
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
[3] sequence req1: contention metadata: 00000002 @ "a"

debug-lock-table
----
Expand Down Expand Up @@ -181,6 +182,7 @@ on-txn-updated txn=txn2 status=committed
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
[3] sequence req1: contention metadata: 00000002 @ "a"

handle-write-intent-error req=req1 lease-seq=1
intent txn=txn2 key=b
Expand Down Expand Up @@ -367,6 +369,7 @@ on-txn-updated txn=txn2 status=aborted
[4] sequence req1: acquiring latches
[4] sequence req1: scanning lock table for conflicting locks
[4] sequence req1: sequencing complete, returned guard
[4] sequence req1: contention metadata: 00000002 @ "a"

debug-lock-table
----
Expand Down Expand Up @@ -578,9 +581,11 @@ on-txn-updated txn=txn3 status=aborted
[3] sequence req1: resolving intent "c" for txn 00000003 with ABORTED status
[3] sequence req1: pushing txn 00000005 to abort
[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction
[3] sequence req1: contention metadata: 00000003 @ "c"
[6] sequence req2: resolving intent "a" for txn 00000003 with ABORTED status
[6] sequence req2: pushing timestamp of txn 00000004 above 11.000000000,1
[6] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction
[6] sequence req2: contention metadata: 00000003 @ "a"

debug-lock-table
----
Expand Down Expand Up @@ -608,12 +613,15 @@ local: num=0
on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4
[3] sequence req1: contention metadata: 00000003 @ "c"
[6] sequence req2: resolving intent "b" for txn 00000004 with ABORTED status
[6] sequence req2: resolving a batch of 1 intent(s)
[6] sequence req2: resolving intent "d" for txn 00000003 with ABORTED status
[6] sequence req2: acquiring latches
[6] sequence req2: scanning lock table for conflicting locks
[6] sequence req2: sequencing complete, returned guard
[6] sequence req2: contention metadata: 00000003 @ "a"
[6] sequence req2: contention metadata: 00000004 @ "b"

debug-lock-table
----
Expand All @@ -632,6 +640,7 @@ local: num=0
finish req=req2
----
[-] finish req2: finishing request
[3] sequence req1: contention metadata: 00000003 @ "c"

# Tht txn holding e is aborted, so req1 can resolve e and proceed to
# evaluation.
Expand All @@ -642,6 +651,8 @@ on-txn-updated txn=txn5 status=aborted
[3] sequence req1: acquiring latches
[3] sequence req1: scanning lock table for conflicting locks
[3] sequence req1: sequencing complete, returned guard
[3] sequence req1: contention metadata: 00000003 @ "c"
[3] sequence req1: contention metadata: 00000005 @ "e"

finish req=req1
----
Expand Down
Loading

0 comments on commit d1daac5

Please sign in to comment.