Skip to content

Commit

Permalink
concurrency: emit structured contention information to trace
Browse files Browse the repository at this point in the history
This change attaches a protobuf payload to the current Span whenever a
request conflicts with another transaction. The payload contains the
contending txn (i.e. the pushee) at the time at which it was first
encountered, the key on which the conflict occurred (note that this is
not necessarily the key at which the pushee is anchored) and the time
spent waiting on the conflict (excluding intent resolution).

This enables cockroachdb#57114.

Touches cockroachdb#55583. I am not closing that issue yet because we also want
to potentially track the outcome of the conflict.

Release note: None
  • Loading branch information
tbg committed Jan 20, 2021
1 parent ff7d7f4 commit dc689ae
Show file tree
Hide file tree
Showing 23 changed files with 1,151 additions and 706 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
// Configs + Knobs.
MaxLockTableSize int64
DisableTxnPushing bool
OnContentionEvent func(*roachpb.ContentionEvent) // may be nil; allowed to mutate the event
TxnWaitKnobs txnwait.TestingKnobs
}

Expand Down Expand Up @@ -92,6 +93,7 @@ func NewManager(cfg Config) Manager {
ir: cfg.IntentResolver,
lt: lt,
disableTxnPushing: cfg.DisableTxnPushing,
onContentionEvent: cfg.OnContentionEvent,
},
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
// pkg/storage/concurrency/txnwait package.
Expand Down
68 changes: 49 additions & 19 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,17 @@ func TestConcurrencyManagerBasic(t *testing.T) {
}
latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs)

c.requestsByName[reqName] = concurrency.Request{
Txn: txn,
Timestamp: ts,
// TODO(nvanbenschoten): test Priority
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
Requests: reqUnions,
LatchSpans: latchSpans,
LockSpans: lockSpans,
}
c.requestsByName[reqName] = testReq{
Request: concurrency.Request{
Txn: txn,
Timestamp: ts,
// TODO(nvanbenschoten): test Priority
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
Requests: reqUnions,
LatchSpans: latchSpans,
LockSpans: lockSpans,
}}
return ""

case "sequence":
Expand All @@ -190,8 +191,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
c.mu.Unlock()

opName := fmt.Sprintf("sequence %s", reqName)
mon.runAsync(opName, func(ctx context.Context) {
guard, resp, err := m.SequenceReq(ctx, prev, req)
cancel := mon.runAsync(opName, func(ctx context.Context) {
guard, resp, err := m.SequenceReq(ctx, prev, req.Request)
if err != nil {
log.Eventf(ctx, "sequencing complete, returned error: %v", err)
} else if resp != nil {
Expand All @@ -205,6 +206,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
log.Event(ctx, "sequencing complete, returned no guard")
}
})
req.cancel = cancel
c.requestsByName[reqName] = req
return c.waitAndCollect(t, mon)

case "finish":
Expand Down Expand Up @@ -477,6 +480,11 @@ func TestConcurrencyManagerBasic(t *testing.T) {
})
}

type testReq struct {
cancel func()
concurrency.Request
}

// cluster encapsulates the state of a running cluster and a set of requests.
// It serves as the test harness in TestConcurrencyManagerBasic - maintaining
// transaction and request declarations, recording the state of in-flight
Expand All @@ -491,7 +499,7 @@ type cluster struct {
// Definitions.
txnCounter uint32
txnsByName map[string]*roachpb.Transaction
requestsByName map[string]concurrency.Request
requestsByName map[string]testReq

// Request state. Cleared on reset.
mu syncutil.Mutex
Expand All @@ -511,6 +519,7 @@ type txnRecord struct {
type txnPush struct {
ctx context.Context
pusher, pushee uuid.UUID
count int
}

func newCluster() *cluster {
Expand All @@ -520,7 +529,7 @@ func newCluster() *cluster {
rangeDesc: &roachpb.RangeDescriptor{RangeID: 1},

txnsByName: make(map[string]*roachpb.Transaction),
requestsByName: make(map[string]concurrency.Request),
requestsByName: make(map[string]testReq),
guardsByReqName: make(map[string]*concurrency.Guard),
txnRecords: make(map[uuid.UUID]*txnRecord),
txnPushes: make(map[uuid.UUID]*txnPush),
Expand All @@ -533,6 +542,9 @@ func (c *cluster) makeConfig() concurrency.Config {
RangeDesc: c.rangeDesc,
Settings: c.st,
IntentResolver: c,
OnContentionEvent: func(ev *roachpb.ContentionEvent) {
ev.Duration = 1234 * time.Millisecond // for determinism
},
TxnWaitMetrics: txnwait.NewMetrics(time.Minute),
}
}
Expand Down Expand Up @@ -680,11 +692,18 @@ func (r *txnRecord) asTxn() (*roachpb.Transaction, chan struct{}) {
func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*txnPush, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.txnPushes[pusher]; ok {
return nil, errors.Errorf("txn %v already pushing", pusher)
if p, ok := c.txnPushes[pusher]; ok {
if pushee != p.pushee {
return nil, errors.Errorf("pusher %s can't push two txns %s and %s at the same time",
pusher.Short(), pushee.Short(), p.pushee.Short(),
)
}
p.count++
return p, nil
}
p := &txnPush{
ctx: ctx,
count: 1,
pusher: pusher,
pushee: pushee,
}
Expand All @@ -695,7 +714,17 @@ func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*
func (c *cluster) unregisterPush(push *txnPush) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.txnPushes, push.pusher)
p, ok := c.txnPushes[push.pusher]
if !ok {
return
}
p.count--
if p.count == 0 {
delete(c.txnPushes, push.pusher)
}
if p.count < 0 {
panic(fmt.Sprintf("negative count: %+v", p))
}
}

// detectDeadlocks looks at all in-flight transaction pushes and determines
Expand Down Expand Up @@ -792,7 +821,7 @@ func (c *cluster) resetNamespace() {
defer c.mu.Unlock()
c.txnCounter = 0
c.txnsByName = make(map[string]*roachpb.Transaction)
c.requestsByName = make(map[string]concurrency.Request)
c.requestsByName = make(map[string]testReq)
c.txnRecords = make(map[uuid.UUID]*txnRecord)
}

Expand Down Expand Up @@ -871,7 +900,7 @@ func (m *monitor) runSync(opName string, fn func(context.Context)) {
atomic.StoreInt32(&g.finished, 1)
}

func (m *monitor) runAsync(opName string, fn func(context.Context)) {
func (m *monitor) runAsync(opName string, fn func(context.Context)) (cancel func()) {
m.seq++
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), opName)
g := &monitoredGoroutine{
Expand All @@ -887,6 +916,7 @@ func (m *monitor) runAsync(opName string, fn func(context.Context)) {
fn(ctx)
atomic.StoreInt32(&g.finished, 1)
}()
return cancel
}

func (m *monitor) numMonitored() int {
Expand Down
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type waitingState struct {
// Represents who the request is waiting for. The conflicting
// transaction may be a lock holder of a conflicting lock or a
// conflicting request being sequenced through the same lockTable.
txn *enginepb.TxnMeta // always non-nil
txn *enginepb.TxnMeta // always non-nil in waitFor{,Distinguished,Self}
key roachpb.Key // the key of the conflict
held bool // is the conflict a held lock?

Expand Down Expand Up @@ -968,7 +968,12 @@ func (l *lockState) informActiveWaiters() {
g := qg.guard
var state waitingState
if g.isSameTxnAsReservation(waitForState) {
state = waitingState{kind: waitSelf}
state = waitingState{
kind: waitSelf,
key: waitForState.key,
txn: waitForState.txn,
held: waitForState.held, // false
}
} else {
state = waitForState
state.guardAccess = spanset.SpanReadWrite
Expand Down Expand Up @@ -1339,7 +1344,12 @@ func (l *lockState) tryActiveWait(
g.key = l.key
g.mu.startWait = true
if g.isSameTxnAsReservation(waitForState) {
g.mu.state = waitingState{kind: waitSelf}
g.mu.state = waitingState{
kind: waitSelf,
key: waitForState.key,
txn: waitForState.txn,
held: waitForState.held, // false
}
} else {
state := waitForState
state.guardAccess = sa
Expand Down
87 changes: 87 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package concurrency

import (
"bytes"
"context"
"math"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -99,6 +101,9 @@ type lockTableWaiterImpl struct {
// When set, WriteIntentError are propagated instead of pushing
// conflicting transactions.
disableTxnPushing bool
// When set, called just before each ContentionEvent is emitted.
// Is allowed to mutate the event.
onContentionEvent func(ev *roachpb.ContentionEvent)
}

// IntentResolver is an interface used by lockTableWaiterImpl to push
Expand Down Expand Up @@ -131,11 +136,24 @@ func (w *lockTableWaiterImpl) WaitOn(
var timer *timeutil.Timer
var timerC <-chan time.Time
var timerWaitingState waitingState

h := contentionEventHelper{
sp: tracing.SpanFromContext(ctx),
onEvent: w.onContentionEvent,
}
defer h.emit()

for {
select {
// newStateC will be signaled for the transaction we are currently
// contending on. We will continue to receive updates about this
// transaction until it no longer contends with us, at which point
// either one of the other channels fires or we receive state
// about another contending transaction on newStateC.
case <-newStateC:
timerC = nil
state := guard.CurState()
h.emitAndInit(state)
switch state.kind {
case waitFor, waitForDistinguished:
if req.WaitPolicy == lock.WaitPolicy_Error {
Expand Down Expand Up @@ -654,6 +672,75 @@ func (c *txnCache) insertFrontLocked(txn *roachpb.Transaction) {
c.txns[0] = txn
}

// contentionEventHelper tracks and emits ContentionEvents.
type contentionEventHelper struct {
sp *tracing.Span
onEvent func(event *roachpb.ContentionEvent) // may be nil

// Internal.
ev *roachpb.ContentionEvent
tBegin time.Time
}

// emit emits the open contention event, if any.
func (h *contentionEventHelper) emit() {
if h.ev == nil {
return
}
h.ev.Duration = timeutil.Since(h.tBegin)
if h.onEvent != nil {
// NB: this is intentionally above the call to LogStructured so that
// this interceptor gets to mutate the event (used for test determinism).
h.onEvent(h.ev)
}
h.sp.LogStructured(h.ev)
h.ev = nil
}

// emitAndInit compares the waitingState's active txn (if any) against the current
// ContentionEvent (if any). If the they match, we are continuing to handle the
// same event and no action is taken. If they differ, the open event (if any) is
// finalized and added to the Span, and a new event initialized from the inputs.
func (h *contentionEventHelper) emitAndInit(s waitingState) {
if h.sp == nil {
// No span to attach payloads to - don't do any work.
//
// TODO(tbg): we could special case the noop span here too, but the plan is for
// nobody to use noop spans any more (trace.mode=background).
return
}

// If true, we want to emit the current event and possibly start a new one.
// Otherwise,
switch s.kind {
case waitFor, waitForDistinguished, waitSelf:
// If we're tracking an event and see a different txn/key, the event is
// done and we initialize the new event tracking the new txn/key.
//
// NB: we're guaranteed to have `s.{txn,key}` populated here.
if h.ev != nil &&
(!h.ev.TxnMeta.ID.Equal(s.txn.ID) || !bytes.Equal(h.ev.Key, s.key)) {
h.emit() // h.ev is now nil
}

if h.ev == nil {
h.ev = &roachpb.ContentionEvent{
Key: s.key,
TxnMeta: *s.txn,
}
h.tBegin = timeutil.Now()
}
case waitElsewhere, doneWaiting:
// If we have an event, emit it now and that's it - the case we're in
// does not give us a new transaction/key.
if h.ev != nil {
h.emit()
}
default:
panic("unhandled waitingState.kind")
}
}

func newWriteIntentErr(ws waitingState) *Error {
return roachpb.NewError(&roachpb.WriteIntentError{
Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)},
Expand Down
Loading

0 comments on commit dc689ae

Please sign in to comment.