Skip to content

Commit

Permalink
kv: preemptively refresh transaction timestamps
Browse files Browse the repository at this point in the history
Relates to cockroachdb#51294.

This commit adds logic to the txnSpanRefresher to preemptively perform
refreshes before issuing requests when doing is guaranteed to be
beneficial. We perform a preemptive refresh if either a) doing so would
be free because we have not yet accumulated any refresh spans, or b) the
batch contains a committing EndTxn request that we know will be rejected
if issued.

The first case is straightforward. If the transaction has yet to perform
any reads but has had its write timestamp bumped, refreshing is a
trivial no-op. In this case, refreshing eagerly prevents the transaction
for performing any future reads at its current read timestamp. Not doing
so preemptively guarantees that we will need to perform a real refresh
in the future if the transaction ever performs a read. At best, this
would be wasted work. At worst, this could result in the future refresh
failing. So we might as well refresh preemptively while doing so is
free.

Note that this first case here does NOT obviate the need for server-side
refreshes. Notably, a transaction's write timestamp might be bumped in
the same batch in which it performs its first read. In such cases, a
preemptive refresh would not be needed but a reactive refresh would not
be a trivial no-op. These situations are common for one-phase commit
transactions.

The second case is more complex. If the batch contains a committing
EndTxn request that we know will need a refresh, we don't want to bother
issuing it just for it to be rejected. Instead, preemptively refresh
before issuing the EndTxn batch. If we view reads as acquiring a form of
optimistic read locks under an optimistic concurrency control scheme (as
is discussed in the comment on txnSpanRefresher) then this preemptive
refresh immediately before the EndTxn is synonymous with the
"validation" phase of a standard OCC transaction model. However, as an
optimization compared to standard OCC, the validation phase is only
performed when necessary in CockroachDB (i.e. if the transaction's
writes have been pushed to higher timestamps).

This second case will play into the solution for cockroachdb#51294. Now that we
perform these preemptive refreshes when possible, we know that it is
always to right choice to split off the EndTxn from the rest of the
batch during a txnSpanRefresher auto-retry. Without this change, it was
unclear whether the first refresh of an EndTxn batch was caused by
earlier requests in the transaction or by other requests in the current
batch. Now, it is always caused by other requests in the same batch, so
it is always clear that we should split the EndTxn from the rest of the
batch immediately after the first refresh.

Release notes (performance improvement): validation of optimistic reads
is now performed earlier in transactions when doing so can save work.
This eliminates certain types of transaction retry errors and avoids
wasted RPC traffic.
  • Loading branch information
nvanbenschoten committed Aug 16, 2020
1 parent 85c66ec commit 924c4ba
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 54 deletions.
93 changes: 77 additions & 16 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.Put(ctx, "a", "put") // put to advance txn ts
},
// No retry, preemptive refresh before commit.
},
{
name: "forwarded timestamp with get and put after timestamp leaked",
Expand Down Expand Up @@ -1916,8 +1917,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.DelRange(ctx, "a", "b")
},
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
// No retry, preemptive refresh before commit.
},
{
name: "forwarded timestamp with put in batch commit",
Expand All @@ -1930,7 +1930,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("a", "put")
return txn.CommitInBatch(ctx, b)
},
// No retries, 1pc commit.
// No retries, server-side refresh, 1pc commit.
},
{
name: "forwarded timestamp with cput in batch commit",
Expand All @@ -1946,7 +1946,39 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("orig"))
return txn.CommitInBatch(ctx, b)
},
// No retries, 1pc commit.
// No retries, server-side refresh, 1pc commit.
},
{
name: "forwarded timestamp with get before commit",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
// Advance timestamp.
if err := txn.Put(ctx, "a", "put"); err != nil {
return err
}
_, err := txn.Get(ctx, "a2")
return err
},
// No retry, preemptive refresh before get.
},
{
name: "forwarded timestamp with scan before commit",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
// Advance timestamp.
if err := txn.Put(ctx, "a", "put"); err != nil {
return err
}
_, err := txn.Scan(ctx, "a2", "a3", 0)
return err
},
// No retry, preemptive refresh before scan.
},
{
name: "forwarded timestamp with get in batch commit",
Expand All @@ -1963,8 +1995,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Get("a2")
return txn.CommitInBatch(ctx, b)
},
// Read-only request (Get) prevents server-side refresh.
txnCoordRetry: true,
// No retry, preemptive refresh before commit.
},
{
name: "forwarded timestamp with scan in batch commit",
Expand All @@ -1981,6 +2012,35 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Scan("a2", "a3")
return txn.CommitInBatch(ctx, b)
},
// No retry, preemptive refresh before commit.
},
{
name: "forwarded timestamp with put and get in batch commit",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Get("a2")
b.Put("a", "put") // advance timestamp
return txn.CommitInBatch(ctx, b)
},
// Read-only request (Get) prevents server-side refresh.
txnCoordRetry: true,
},
{
name: "forwarded timestamp with put and scan in batch commit",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Scan("a2", "a3")
b.Put("a", "put") // advance timestamp
return txn.CommitInBatch(ctx, b)
},
// Read-only request (Scan) prevents server-side refresh.
txnCoordRetry: true,
},
Expand Down Expand Up @@ -2046,7 +2106,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
// No retry, preemptive refresh before commit.
},
{
// Even if accounting for the refresh spans would have exhausted the
Expand Down Expand Up @@ -2082,14 +2142,15 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
// No retry, preemptive refresh before commit.
},
{
name: "write too old with put",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
fmt.Println("TXN IS", txn.TestingCloneTxn())
return txn.Put(ctx, "a", "put")
},
},
Expand Down Expand Up @@ -2440,7 +2501,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("a", "new-put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, 1pc commit.
// No retries, server-side refresh, 1pc commit.
},
{
// This test is like the previous one in that the commit batch succeeds at
Expand Down Expand Up @@ -2865,7 +2926,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}

var metrics kvcoord.TxnMetrics
var lastRefreshes int64
var lastAutoRetries int64
var hadClientRetry bool
epoch := 0
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down Expand Up @@ -2897,7 +2958,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}

metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics()
lastRefreshes = metrics.RefreshSuccess.Count()
lastAutoRetries = metrics.RefreshAutoRetries.Count()

return tc.retryable(ctx, txn)
}); err != nil {
Expand All @@ -2913,11 +2974,11 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// from the cluster setup are still ongoing and can experience
// their own retries, this might increase by more than one, so we
// can only check here that it's >= 1.
refreshes := metrics.RefreshSuccess.Count() - lastRefreshes
if tc.txnCoordRetry && refreshes == 0 {
t.Errorf("expected [at least] one txn coord sender auto retry; got %d", refreshes)
} else if !tc.txnCoordRetry && refreshes != 0 {
t.Errorf("expected no txn coord sender auto retries; got %d", refreshes)
autoRetries := metrics.RefreshAutoRetries.Count() - lastAutoRetries
if tc.txnCoordRetry && autoRetries == 0 {
t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries)
} else if !tc.txnCoordRetry && autoRetries != 0 {
t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries)
}
if tc.clientRetry && !hadClientRetry {
t.Errorf("expected but did not experience client retry")
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (tc *TxnCoordSender) initCommonInterceptors(
refreshFail: tc.metrics.RefreshFail,
refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans,
refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded,
refreshAutoRetries: tc.metrics.RefreshAutoRetries,
}
tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{
wrapped: tc.wrapped,
Expand Down
Loading

0 comments on commit 924c4ba

Please sign in to comment.