Skip to content

Commit

Permalink
kv: add metric for server-side transaction refreshes
Browse files Browse the repository at this point in the history
Generalized server-side transaction refreshes were introduced in 972915d.
This commit adds a new metric to track the number of server-side refreshes,
called `txn.refresh.success_server_side`. The metric is maintained by the
`txnSpanRefresher`, making it easy to test and associate with the specific
transaction that experienced the server-side refresh.

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Apr 30, 2023
1 parent 8154f76 commit 0d206a9
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 118 deletions.
129 changes: 102 additions & 27 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,9 +2060,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
priorReads bool
tsLeaked bool
// Testing expectations.
expClientRefresh bool // pre-emptive or reactive client refresh
expClientAutoRetryAfterRefresh bool // auto-retries of batches after client refresh
expClientRestart bool // client side restarts
expClientRefresh bool // pre-emptive or reactive client-side refresh
expClientAutoRetryAfterRefresh bool // auto-retries of batches after client-side refresh
expClientRestart bool // client-side txn restart
expServerRefresh bool // server-side refresh
expOnePhaseCommit bool // 1PC commits
expFailure string // regexp pattern to match on error, if not empty
}{
Expand Down Expand Up @@ -2173,6 +2174,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return txn.CommitInBatch(ctx, b)
},
// No retries, server-side refresh, 1pc commit.
expServerRefresh: true,
expOnePhaseCommit: true,
},
{
Expand All @@ -2190,6 +2192,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return txn.CommitInBatch(ctx, b)
},
// No retries, server-side refresh, 1pc commit.
expServerRefresh: true,
expOnePhaseCommit: true,
},
{
Expand Down Expand Up @@ -2326,6 +2329,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
expClientAutoRetryAfterRefresh: false,
expServerRefresh: true,
},
{
// Even if accounting for the refresh spans would have exhausted the
Expand Down Expand Up @@ -2410,6 +2414,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.Put(ctx, "a", "put")
},
expServerRefresh: true,
},
{
name: "write too old with put after prior read",
Expand Down Expand Up @@ -2500,8 +2505,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // the failure we get is a condition failed error
expServerRefresh: true, // non-matching value means we perform server-side refresh but then fail
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "write too old with cput matching older and newer values",
Expand All @@ -2514,6 +2519,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
expServerRefresh: true,
},
{
name: "write too old with cput matching older and newer values after prior read",
Expand Down Expand Up @@ -2550,6 +2556,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
expServerRefresh: true,
},
{
name: "write too old with increment after prior read",
Expand Down Expand Up @@ -2583,6 +2590,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", false)
},
expServerRefresh: true,
},
{
name: "write too old with initput after prior read",
Expand All @@ -2608,6 +2616,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", false)
},
expServerRefresh: true,
},
{
name: "write too old with initput matching older and newer values after prior read",
Expand Down Expand Up @@ -2636,8 +2645,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put1", false)
},
expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // the failure we get is a condition failed error
expServerRefresh: true, // non-matching value means we perform server-side refresh but then fail
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "write too old with initput matching newer value",
Expand Down Expand Up @@ -2679,8 +2688,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", true)
},
expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // condition failed error when failing on tombstones
expServerRefresh: true, // non-matching value means we perform server-side refresh but then fail
expFailure: "unexpected value", // condition failed error when failing on tombstones
},
{
name: "write too old with locking read",
Expand All @@ -2691,6 +2700,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0)
return err
},
expServerRefresh: true,
expOnePhaseCommit: true,
},
{
Expand Down Expand Up @@ -2799,6 +2809,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, server-side refresh, 1pc commit.
expServerRefresh: true,
expOnePhaseCommit: true,
},
{
Expand All @@ -2825,6 +2836,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
// The request will succeed after a server-side refresh.
expClientAutoRetryAfterRefresh: false,
expServerRefresh: true,
},
{
name: "write too old with cput in batch commit",
Expand All @@ -2844,6 +2856,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// WriteTooOldError, and then once at the pushed timestamp. The
// server-side retry is enabled by the fact that there have not been any
// previous reads and so the transaction can commit at a pushed timestamp.
expServerRefresh: true,
expOnePhaseCommit: true,
},
{
Expand All @@ -2861,10 +2874,31 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("orig"))
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
expFailure: "unexpected value", // The CPut cannot succeed.
expServerRefresh: true, // non-matching value means we perform server-side refresh but then fail
expFailure: "unexpected value", // The CPut cannot succeed.
},
{
name: "multi-range batch commit with forwarded timestamp (err on first range)",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Put("a", "put")
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// The Put to "a" and the EndTxn will succeed after a server-side refresh.
// This will instruct the txn to stage at the post-refresh timestamp,
// qualifying for the implicit commit condition and avoiding a client-side
// refresh.
expServerRefresh: true,
expClientRefresh: false,
expClientAutoRetryAfterRefresh: false,
},
{
name: "multi-range batch with forwarded timestamp",
name: "multi-range batch commit with forwarded timestamp (err on second range)",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "c") // set ts cache
return err
Expand All @@ -2875,11 +2909,15 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// The Put to "c" will succeed with a forwarded timestamp. However, the
// txn has already staged on the other range at an earlier timestamp. As a
// result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
expClientRefresh: true,
expClientAutoRetryAfterRefresh: true,
},
{
name: "multi-range batch with forwarded timestamp and cput",
name: "multi-range batch commit with forwarded timestamp and cput",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "value")
},
Expand All @@ -2893,9 +2931,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b) // both puts will succeed, no retry
},
expServerRefresh: true,
},
{
name: "multi-range batch with forwarded timestamp and cput and get",
name: "multi-range batch commit with forwarded timestamp and cput and get",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "value")
},
Expand All @@ -2916,7 +2955,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expClientAutoRetryAfterRefresh: true,
},
{
name: "multi-range batch with forwarded timestamp and cput and delete range",
name: "multi-range batch commit with forwarded timestamp and cput and delete range",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "c", "value")
},
Expand All @@ -2934,21 +2973,45 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expClientAutoRetryAfterRefresh: true,
},
{
name: "multi-range batch with write too old",
name: "multi-range batch commit with write too old (err on first range)",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "value")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Put("a", "put")
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// The Put to "a" and EndTxn will succeed after a server-side refresh.
// This will instruct the txn to stage at the post-refresh timestamp,
// qualifying for the implicit commit condition and avoiding a client-side
// refresh.
expServerRefresh: true,
expClientRefresh: false,
expClientAutoRetryAfterRefresh: false,
},
{
name: "multi-range batch commit with write too old (err on second range)",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "c", "value")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Put("a", "put")
b.Put("c", "put")
return txn.CommitInBatch(ctx, b) // put to c will return WriteTooOldError
return txn.CommitInBatch(ctx, b)
},
// The Put to "c" will succeed after a server-side refresh. However, the
// txn has already staged on the other range at the pre-refresh timestamp.
// As a result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
expServerRefresh: true,
expClientRefresh: true,
expClientAutoRetryAfterRefresh: true,
},
{
name: "multi-range batch with write too old and failed cput",
name: "multi-range batch commit with write too old and failed cput",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "orig")
},
Expand All @@ -2961,11 +3024,11 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // the failure we get is a condition failed error
expServerRefresh: true, // non-matching value means we perform server-side refresh but then fail
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "multi-range batch with write too old and successful cput",
name: "multi-range batch commit with write too old and successful cput",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "orig")
},
Expand All @@ -2980,6 +3043,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
// We expect the request to succeed after a server-side retry.
expClientAutoRetryAfterRefresh: false,
expServerRefresh: true,
},
{
// This test checks the behavior of batches that were split by the
Expand Down Expand Up @@ -3031,6 +3095,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
expClientAutoRetryAfterRefresh: false,
expServerRefresh: true,
},
{
name: "cput within uncertainty interval after timestamp leaked",
Expand Down Expand Up @@ -3089,7 +3154,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expClientRestart: true, // note this txn is read-only but still restarts
},
{
name: "multi-range batch with uncertainty interval error",
name: "multi-range batch commit with uncertainty interval error",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "c", "value")
},
Expand All @@ -3101,12 +3166,17 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key("c")),
filter: newUncertaintyFilter(roachpb.Key("c")),
// The cput to "c" will succeed after a server-side refresh. However, the
// txn has already staged on the other range at the pre-refresh timestamp.
// As a result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
expServerRefresh: true,
expClientRefresh: true,
expClientAutoRetryAfterRefresh: true,
},
{
name: "multi-range batch with uncertainty interval error and get conflict",
name: "multi-range batch commit with uncertainty interval error and get conflict",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "init")
},
Expand All @@ -3128,7 +3198,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expClientRestart: true, // will fail because of conflict on refresh span for the Get
},
{
name: "multi-range batch with uncertainty interval error and mixed success",
name: "multi-range batch commit with uncertainty interval error and mixed success",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "c", "value")
},
Expand All @@ -3139,7 +3209,11 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
// The cput to "c" will succeed after a server-side refresh. However, the
// txn has already staged on the other range at the pre-refresh timestamp.
// As a result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
expServerRefresh: true,
expClientRefresh: true,
expClientAutoRetryAfterRefresh: true,
},
Expand Down Expand Up @@ -3289,8 +3363,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}

// Verify metrics.
require.Equal(t, tc.expClientRefresh, metrics.RefreshSuccess.Count() != 0, "TxnMetrics.RefreshSuccess")
require.Equal(t, tc.expClientAutoRetryAfterRefresh, metrics.RefreshAutoRetries.Count() != 0, "TxnMetrics.RefreshAutoRetries")
require.Equal(t, tc.expClientRefresh, metrics.ClientRefreshSuccess.Count() != 0, "TxnMetrics.ClientRefreshSuccess")
require.Equal(t, tc.expClientAutoRetryAfterRefresh, metrics.ClientRefreshAutoRetries.Count() != 0, "TxnMetrics.ClientRefreshAutoRetries")
require.Equal(t, tc.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess")
require.Equal(t, tc.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts")
require.Equal(t, tc.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC")
})
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,13 @@ func (tc *TxnCoordSender) initCommonInterceptors(
// because those are the only places where we have all of the
// refresh spans. If this is a leaf, as in a distributed sql flow,
// we need to propagate the error to the root for an epoch restart.
canAutoRetry: typ == kv.RootTxn,
refreshSuccess: tc.metrics.RefreshSuccess,
refreshFail: tc.metrics.RefreshFail,
refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans,
refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded,
refreshAutoRetries: tc.metrics.RefreshAutoRetries,
canAutoRetry: typ == kv.RootTxn,
clientRefreshSuccess: tc.metrics.ClientRefreshSuccess,
clientRefreshFail: tc.metrics.ClientRefreshFail,
clientRefreshFailWithCondensedSpans: tc.metrics.ClientRefreshFailWithCondensedSpans,
clientRefreshMemoryLimitExceeded: tc.metrics.ClientRefreshMemoryLimitExceeded,
clientRefreshAutoRetries: tc.metrics.ClientRefreshAutoRetries,
serverRefreshSuccess: tc.metrics.ServerRefreshSuccess,
}
tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{
wrapped: tc.wrapped,
Expand Down
Loading

0 comments on commit 0d206a9

Please sign in to comment.