Skip to content

Commit

Permalink
kv: give txnSpanRefresher pointer to TxnMetrics
Browse files Browse the repository at this point in the history
One pointer is better than six.

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Apr 30, 2023
1 parent 0d206a9 commit 6faa7fc
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 83 deletions.
15 changes: 5 additions & 10 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,15 @@ func (tc *TxnCoordSender) initCommonInterceptors(
condensedIntentsEveryN: &tc.TxnCoordSenderFactory.condensedIntentsEveryN,
}
tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{
st: tcf.st,
knobs: &tcf.testingKnobs,
riGen: riGen,
st: tcf.st,
knobs: &tcf.testingKnobs,
riGen: riGen,
metrics: &tc.metrics,
// We can only allow refresh span retries on root transactions
// because those are the only places where we have all of the
// refresh spans. If this is a leaf, as in a distributed sql flow,
// we need to propagate the error to the root for an epoch restart.
canAutoRetry: typ == kv.RootTxn,
clientRefreshSuccess: tc.metrics.ClientRefreshSuccess,
clientRefreshFail: tc.metrics.ClientRefreshFail,
clientRefreshFailWithCondensedSpans: tc.metrics.ClientRefreshFailWithCondensedSpans,
clientRefreshMemoryLimitExceeded: tc.metrics.ClientRefreshMemoryLimitExceeded,
clientRefreshAutoRetries: tc.metrics.ClientRefreshAutoRetries,
serverRefreshSuccess: tc.metrics.ServerRefreshSuccess,
canAutoRetry: typ == kv.RootTxn,
}
tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{
wrapped: tc.wrapped,
Expand Down
21 changes: 7 additions & 14 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -109,6 +108,7 @@ type txnSpanRefresher struct {
knobs *ClientTestingKnobs
riGen rangeIteratorFactory
wrapped lockedSender
metrics *TxnMetrics

// refreshFootprint contains key spans which were read during the
// transaction. In case the transaction's timestamp needs to be pushed, we can
Expand All @@ -129,13 +129,6 @@ type txnSpanRefresher struct {

// canAutoRetry is set if the txnSpanRefresher is allowed to auto-retry.
canAutoRetry bool

clientRefreshSuccess *metric.Counter
clientRefreshFail *metric.Counter
clientRefreshFailWithCondensedSpans *metric.Counter
clientRefreshMemoryLimitExceeded *metric.Counter
clientRefreshAutoRetries *metric.Counter
serverRefreshSuccess *metric.Counter
}

// SendLocked implements the lockedSender interface.
Expand Down Expand Up @@ -207,7 +200,7 @@ func (sr *txnSpanRefresher) maybeCondenseRefreshSpans(
sr.refreshFootprint.clear()
}
if sr.refreshFootprint.condensed && !condensedBefore {
sr.clientRefreshMemoryLimitExceeded.Inc(1)
sr.metrics.ClientRefreshMemoryLimitExceeded.Inc(1)
}
}
}
Expand Down Expand Up @@ -314,7 +307,7 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
log.Eventf(ctx, "refresh succeeded; retrying original request")
ba = ba.ShallowCopy()
ba.UpdateTxn(refreshToTxn)
sr.clientRefreshAutoRetries.Inc(1)
sr.metrics.ClientRefreshAutoRetries.Inc(1)

// To prevent starvation of batches that are trying to commit, split off the
// EndTxn request into its own batch on auto-retries. This avoids starvation
Expand Down Expand Up @@ -511,11 +504,11 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
// Track the result of the refresh in metrics.
defer func() {
if err == nil {
sr.clientRefreshSuccess.Inc(1)
sr.metrics.ClientRefreshSuccess.Inc(1)
} else {
sr.clientRefreshFail.Inc(1)
sr.metrics.ClientRefreshFail.Inc(1)
if sr.refreshFootprint.condensed {
sr.clientRefreshFailWithCondensedSpans.Inc(1)
sr.metrics.ClientRefreshFailWithCondensedSpans.Inc(1)
}
}
}()
Expand Down Expand Up @@ -645,7 +638,7 @@ func (sr *txnSpanRefresher) forwardRefreshTimestampOnResponse(
"CanForwardReadTimestamp set. ba: %s, ba.Txn: %s, br.Txn: %s", ba.Summary(), baTxn, brTxn)
}
sr.refreshedTimestamp.Forward(brTxn.ReadTimestamp)
sr.serverRefreshSuccess.Inc(1)
sr.metrics.ServerRefreshSuccess.Inc(1)
} else if brTxn.ReadTimestamp.Less(baTxn.ReadTimestamp) {
return errors.AssertionFailedf("received transaction in response with "+
"earlier read timestamp than in the request. ba.Txn: %s, br.Txn: %s", baTxn, brTxn)
Expand Down
114 changes: 55 additions & 59 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,13 @@ import (

func makeMockTxnSpanRefresher() (txnSpanRefresher, *mockLockedSender) {
mockSender := &mockLockedSender{}
metrics := MakeTxnMetrics(metric.TestSampleInterval)
return txnSpanRefresher{
st: cluster.MakeTestingClusterSettings(),
knobs: new(ClientTestingKnobs),
wrapped: mockSender,
canAutoRetry: true,
clientRefreshSuccess: metric.NewCounter(metaClientRefreshSuccess),
clientRefreshFail: metric.NewCounter(metaClientRefreshFail),
clientRefreshFailWithCondensedSpans: metric.NewCounter(metaClientRefreshFailWithCondensedSpans),
clientRefreshMemoryLimitExceeded: metric.NewCounter(metaClientRefreshMemoryLimitExceeded),
clientRefreshAutoRetries: metric.NewCounter(metaClientRefreshAutoRetries),
serverRefreshSuccess: metric.NewCounter(metaServerRefreshSuccess),
st: cluster.MakeTestingClusterSettings(),
knobs: new(ClientTestingKnobs),
wrapped: mockSender,
metrics: &metrics,
canAutoRetry: true,
}, mockSender
}

Expand Down Expand Up @@ -286,18 +282,18 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp)
require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp)
require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp)
require.Equal(t, int64(1), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(1), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
} else {
require.Nil(t, br)
require.NotNil(t, pErr)
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(0), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
}
})
}
Expand Down Expand Up @@ -420,10 +416,10 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, int64(1), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
require.True(t, tsr.refreshFootprint.empty())
require.False(t, tsr.refreshInvalid)

Expand Down Expand Up @@ -455,9 +451,9 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, int64(2), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(2), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)

Expand Down Expand Up @@ -497,10 +493,10 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
require.Regexp(t,
"TransactionRetryError: retry txn \\(RETRY_SERIALIZABLE - failed preemptive refresh "+
"due to a conflict: committed value on key \"a\"\\)", pErr)
require.Equal(t, int64(2), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(1), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(2), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)

Expand Down Expand Up @@ -537,10 +533,10 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, int64(3), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(1), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(3), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
}
Expand Down Expand Up @@ -608,10 +604,10 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
if tt.expRefresh {
expRefreshSuccess = 1
}
require.Equal(t, expRefreshSuccess, tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.serverRefreshSuccess.Count())
require.Equal(t, expRefreshSuccess, tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
require.True(t, tsr.refreshFootprint.empty())
require.False(t, tsr.refreshInvalid)
})
Expand Down Expand Up @@ -852,9 +848,9 @@ func TestTxnSpanRefresherSplitEndTxnOnAutoRetry(t *testing.T) {
default:
require.Fail(t, "unexpected")
}
require.Equal(t, expSuccess, tsr.clientRefreshSuccess.Count())
require.Equal(t, expFail, tsr.clientRefreshFail.Count())
require.Equal(t, expAutoRetries, tsr.clientRefreshAutoRetries.Count())
require.Equal(t, expSuccess, tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, expFail, tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, expAutoRetries, tsr.metrics.ClientRefreshAutoRetries.Count())

require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
Expand Down Expand Up @@ -887,9 +883,9 @@ func TestTxnSpanRefresherSplitEndTxnOnAutoRetry(t *testing.T) {
default:
require.Fail(t, "unexpected")
}
require.Equal(t, expSuccess, tsr.clientRefreshSuccess.Count())
require.Equal(t, expFail, tsr.clientRefreshFail.Count())
require.Equal(t, expAutoRetries, tsr.clientRefreshAutoRetries.Count())
require.Equal(t, expSuccess, tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, expFail, tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, expAutoRetries, tsr.metrics.ClientRefreshAutoRetries.Count())

if errIdx < 2 {
require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
Expand Down Expand Up @@ -974,7 +970,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
require.False(t, tsr.refreshInvalid)
require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes)
require.False(t, tsr.refreshFootprint.condensed)
require.Equal(t, int64(0), tsr.clientRefreshMemoryLimitExceeded.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshMemoryLimitExceeded.Count())
require.Zero(t, tsr.refreshedTimestamp)

// Exceed the limit again, this time with a non-adjacent span such that
Expand All @@ -991,8 +987,8 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
require.True(t, tsr.refreshFootprint.condensed)
require.False(t, tsr.refreshInvalid)
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(1), tsr.clientRefreshMemoryLimitExceeded.Count())
require.Equal(t, int64(0), tsr.clientRefreshFailWithCondensedSpans.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshMemoryLimitExceeded.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFailWithCondensedSpans.Count())

// Return a transaction retry error and make sure the metric indicating that
// we did not retry due to the refresh span bytes is incremented.
Expand All @@ -1005,7 +1001,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
exp := kvpb.NewTransactionRetryError(kvpb.RETRY_SERIALIZABLE, "")
require.Equal(t, exp, pErr.GetDetail())
require.Nil(t, br)
require.Equal(t, int64(1), tsr.clientRefreshFailWithCondensedSpans.Count())
require.Equal(t, int64(1), tsr.metrics.ClientRefreshFailWithCondensedSpans.Count())
}

// TestTxnSpanRefresherAssignsCanForwardReadTimestamp tests that the
Expand Down Expand Up @@ -1052,10 +1048,10 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) {
require.Equal(t, refreshTS1, tsr.refreshedTimestamp)
require.Nil(t, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(0), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(1), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(1), tsr.metrics.ServerRefreshSuccess.Count())

// Send a Put request for a transaction with a fixed commit timestamp.
// Should NOT set CanForwardReadTimestamp flag.
Expand Down Expand Up @@ -1110,10 +1106,10 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) {
require.Equal(t, refreshTS2, tsr.refreshedTimestamp)
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(0), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(2), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(2), tsr.metrics.ServerRefreshSuccess.Count())

// Send another Scan request. Should NOT set CanForwardReadTimestamp flag.
ba.Requests = nil
Expand Down Expand Up @@ -1187,10 +1183,10 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) {
require.Equal(t, refreshTS3, tsr.refreshedTimestamp)
require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(0), tsr.clientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.clientRefreshFail.Count())
require.Equal(t, int64(0), tsr.clientRefreshAutoRetries.Count())
require.Equal(t, int64(3), tsr.serverRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(3), tsr.metrics.ServerRefreshSuccess.Count())
}

// TestTxnSpanRefresherEpochIncrement tests that a txnSpanRefresher's refresh
Expand Down

0 comments on commit 6faa7fc

Please sign in to comment.