Skip to content

Commit

Permalink
kvclient: add metrics for txns with condensed intents
Browse files Browse the repository at this point in the history
Two new metrics - txn.condensed_intent_spans and
txn.condensed_intent_spans_gauge - tracking transactions whose intent
spans have been collapsed with a loss of fidelity because the respective
transaction has exceeded kv.transaction.max_intents_bytes. Such
transactions are a potential source of instability because resolving
their intents might cost significant CPU (and also latency => contention
footprint).

Also new logging aiming aiming to provide the IDs of some such
transactions.

Release note: None
  • Loading branch information
andreimatei committed Apr 2, 2021
1 parent 9958d48 commit 1bfd8cc
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 25 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func newRootTxnCoordSender(
// Various interceptors below rely on sequence number allocation,
// so the sequence number allocator is near the top of the stack.
&tcs.interceptorAlloc.txnSeqNumAllocator,
// The pipelinger sits above the span refresher because it will
// The pipeliner sits above the span refresher because it will
// never generate transaction retry errors that could be avoided
// with a refresh.
&tcs.interceptorAlloc.txnPipeliner,
Expand Down Expand Up @@ -280,8 +280,10 @@ func (tc *TxnCoordSender) initCommonInterceptors(
riGen.ds = ds
}
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
st: tcf.st,
riGen: riGen,
st: tcf.st,
riGen: riGen,
txnMetrics: &tc.metrics,
condensedIntentsEveryN: &tc.TxnCoordSenderFactory.condensedIntentsEveryN,
}
tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{
st: tcf.st,
Expand Down
34 changes: 18 additions & 16 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
type TxnCoordSenderFactory struct {
log.AmbientContext

st *cluster.Settings
wrapped kv.Sender
clock *hlc.Clock
heartbeatInterval time.Duration
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
metrics TxnMetrics
st *cluster.Settings
wrapped kv.Sender
clock *hlc.Clock
heartbeatInterval time.Duration
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
metrics TxnMetrics
condensedIntentsEveryN log.EveryN

testingKnobs ClientTestingKnobs
}
Expand Down Expand Up @@ -62,15 +63,16 @@ func NewTxnCoordSenderFactory(
cfg TxnCoordSenderFactoryConfig, wrapped kv.Sender,
) *TxnCoordSenderFactory {
tcf := &TxnCoordSenderFactory{
AmbientContext: cfg.AmbientCtx,
st: cfg.Settings,
wrapped: wrapped,
clock: cfg.Clock,
stopper: cfg.Stopper,
linearizable: cfg.Linearizable,
heartbeatInterval: cfg.HeartbeatInterval,
metrics: cfg.Metrics,
testingKnobs: cfg.TestingKnobs,
AmbientContext: cfg.AmbientCtx,
st: cfg.Settings,
wrapped: wrapped,
clock: cfg.Clock,
stopper: cfg.Stopper,
linearizable: cfg.Linearizable,
heartbeatInterval: cfg.HeartbeatInterval,
metrics: cfg.Metrics,
condensedIntentsEveryN: log.Every(time.Second),
testingKnobs: cfg.TestingKnobs,
}
if tcf.st == nil {
tcf.st = cluster.MakeTestingClusterSettings()
Expand Down
31 changes: 25 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ var trackedWritesMaxSize = settings.RegisterIntSetting(
// attached to any end transaction request that is passed through the pipeliner
// to ensure that they the locks within them are released.
type txnPipeliner struct {
st *cluster.Settings
riGen rangeIteratorFactory // used to condense lock spans, if provided
wrapped lockedSender
disabled bool
st *cluster.Settings
riGen rangeIteratorFactory // used to condense lock spans, if provided
wrapped lockedSender
disabled bool
txnMetrics *TxnMetrics
condensedIntentsEveryN *log.EveryN

// In-flight writes are intent point writes that have not yet been proved
// to have succeeded. They will need to be proven before the transaction
Expand Down Expand Up @@ -488,7 +490,20 @@ func (tp *txnPipeliner) updateLockTracking(
) {
// After adding new writes to the lock footprint, check whether we need to
// condense the set to stay below memory limits.
defer tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV))
defer func() {
alreadyCondensed := tp.lockFootprint.condensed
condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV))
if condensed && !alreadyCondensed {
if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx,
"a transaction has hit the intent tracking limit (kv.transaction.max_intents_bytes); "+
"is it a bulk operation? Intent cleanup will be slower. txn: %s ba: %s",
ba.Txn, ba.Summary())
}
tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
}
}()

// If the request failed, add all lock acquisitions attempts directly to the
// lock footprint. This reduces the likelihood of dangling locks blocking
Expand Down Expand Up @@ -682,7 +697,11 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
}

// closeLocked implements the txnInterceptor interface.
func (tp *txnPipeliner) closeLocked() {}
func (tp *txnPipeliner) closeLocked() {
if tp.lockFootprint.condensed {
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Dec(1)
}
}

// hasAcquiredLocks returns whether the interceptor has made an attempt to
// acquire any locks, whether doing so was known to be successful or not.
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,13 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) {
t.Errorf("%d: keys size expected %d; got %d", i, e, a)
}
}

metrics := txn.Sender().(*TxnCoordSender).Metrics()
require.Equal(t, int64(1), metrics.TxnsWithCondensedIntents.Count())
require.Equal(t, int64(1), metrics.TxnsWithCondensedIntentsGauge.Value())

if err := txn.Commit(ctx); err != nil {
t.Fatal(err)
}
require.Zero(t, metrics.TxnsWithCondensedIntentsGauge.Value())
}
21 changes: 21 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type TxnMetrics struct {

Durations *metric.Histogram

TxnsWithCondensedIntents *metric.Counter
TxnsWithCondensedIntentsGauge *metric.Gauge

// Restarts is the number of times we had to restart the transaction.
Restarts *metric.Histogram

Expand Down Expand Up @@ -125,6 +128,22 @@ var (
Measurement: "KV Txn Duration",
Unit: metric.Unit_NANOSECONDS,
}
metaTxnsWithCondensedIntentSpans = metric.Metadata{
Name: "txn.condensed_intent_spans",
Help: "KV transactions that have exceeded their intent tracking " +
"memory budget (kv.transaction.max_intents_bytes). See also " +
"txn.condensed_intent_spans_gauge for a gauge of such transactions currently running.",
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaTxnsWithCondensedIntentSpansGauge = metric.Metadata{
Name: "txn.condensed_intent_spans_gauge",
Help: "KV transactions currently running that have exceeded their intent tracking " +
"memory budget (kv.transaction.max_intents_bytes). See also txn.condensed_intent_spans " +
"for a perpetual counter/rate.",
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaRestartsHistogram = metric.Metadata{
Name: "txn.restarts",
Help: "Number of restarted KV transactions",
Expand Down Expand Up @@ -223,6 +242,8 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
RefreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded),
RefreshAutoRetries: metric.NewCounter(metaRefreshAutoRetries),
Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow),
TxnsWithCondensedIntents: metric.NewCounter(metaTxnsWithCondensedIntentSpans),
TxnsWithCondensedIntentsGauge: metric.NewGauge(metaTxnsWithCondensedIntentSpansGauge),
Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3),
RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld),
RestartsWriteTooOldMulti: telemetry.NewCounterWithMetric(metaRestartsWriteTooOldMulti),
Expand Down
14 changes: 14 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,20 @@ var charts = []sectionDescription{
Percentiles: true,
Metrics: []string{"txn.restarts"},
},
{
Title: "Intents condensing - historical",
Downsampler: DescribeAggregator_MAX,
Metrics: []string{
"txn.condensed_intent_spans",
},
},
{
Title: "Intents condensing - current",
Downsampler: DescribeAggregator_MAX,
Metrics: []string{
"txn.condensed_intent_spans_gauge",
},
},
},
},
{
Expand Down

0 comments on commit 1bfd8cc

Please sign in to comment.