Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#91689

91072: kv,kvcoord: poison the txn coordinator when forcing an error r=lidorcarmel a=lidorcarmel

This PR changes the behavior of GenerateForcedRetryableError():
A retryable func given to DB.Txn() can GenerateForcedRetryableError() and
then return the generated error. Before this PR the retryable func had to
return the error, which signaled the DB.Txn() retry loop to reset
(PrepareForRetry) the txn handle, and then retry the retryable func.
With this PR, the error doesn't need to be returned because when
GenerateForcedRetryableError() is called it poisons the txn handle.
Then, the DB.Txn retry loop can get the error and retry.

This means that with this PR, calling GenerateForcedRetryableError() is more
like having an error during an operation such as Get or Put (those retryable
failures will also poison the txn handle).

Adding a few tests to verify and demonstrate these behaviors. Note that
only TestGenerateForcedRetryableErrorByPoisoning() fails without this PR.

Informs: cockroachdb#86594

Release note: None
Epic: None


91376: kv: pool per read-write batch `MVCCStats` r=nvanbenschoten a=nvanbenschoten

Each read-write BatchRequest uses an `MVCCStats` object to track the impact that its writes will have (when applied) on the Range's aggregated stats. This object is plumbed throughout the stack and is manipulated primarily in the MVCC layer. It is plumbed too far for us to be able to convince escape analysis to keep it on the stack, so we allocate it on the heap.

This commit adds a memory pool for these objects to avoid heap allocations.

```
name                         old time/op    new time/op    delta
KV/Insert/Native/rows=1-10     44.8µs ± 2%    45.1µs ± 2%    ~     (p=0.079 n=19+20)
KV/Insert/Native/rows=10-10    71.3µs ± 4%    70.7µs ± 2%    ~     (p=0.120 n=20+19)
KV/Insert/SQL/rows=1-10         128µs ± 2%     128µs ± 1%    ~     (p=0.251 n=20+18)
KV/Insert/SQL/rows=10-10        177µs ± 1%     177µs ± 3%    ~     (p=0.731 n=17+19)
KV/Update/Native/rows=1-10     67.8µs ± 3%    67.4µs ± 3%    ~     (p=0.166 n=20+19)
KV/Update/Native/rows=10-10     165µs ± 2%     164µs ± 2%    ~     (p=0.235 n=20+19)
KV/Update/SQL/rows=1-10         169µs ± 2%     169µs ± 2%    ~     (p=0.093 n=18+20)
KV/Update/SQL/rows=10-10        331µs ± 2%     330µs ± 2%    ~     (p=0.167 n=20+18)
KV/Delete/Native/rows=1-10     46.2µs ± 1%    46.4µs ± 2%    ~     (p=0.159 n=18+19)
KV/Delete/Native/rows=10-10    84.2µs ± 3%    83.8µs ± 2%    ~     (p=0.529 n=20+20)
KV/Delete/SQL/rows=1-10         145µs ± 1%     144µs ± 1%    ~     (p=0.150 n=19+18)
KV/Delete/SQL/rows=10-10        203µs ± 4%     206µs ± 8%    ~     (p=0.245 n=19+18)

name                         old alloc/op   new alloc/op   delta
KV/Insert/Native/rows=1-10     16.1kB ± 1%    16.0kB ± 1%  -0.98%  (p=0.000 n=20+20)
KV/Delete/Native/rows=1-10     16.7kB ± 0%    16.6kB ± 1%  -0.86%  (p=0.000 n=19+20)
KV/Update/Native/rows=1-10     22.7kB ± 1%    22.5kB ± 1%  -0.74%  (p=0.000 n=19+20)
KV/Insert/Native/rows=10-10    41.5kB ± 0%    41.4kB ± 0%  -0.45%  (p=0.000 n=17+20)
KV/Update/SQL/rows=1-10        50.3kB ± 0%    50.1kB ± 0%  -0.32%  (p=0.000 n=20+20)
KV/Delete/SQL/rows=1-10        51.4kB ± 1%    51.2kB ± 1%  -0.30%  (p=0.010 n=20+19)
KV/Insert/SQL/rows=1-10        43.8kB ± 0%    43.7kB ± 0%  -0.26%  (p=0.000 n=19+20)
KV/Update/SQL/rows=10-10        115kB ± 1%     115kB ± 1%  -0.24%  (p=0.018 n=20+20)
KV/Insert/SQL/rows=10-10       90.4kB ± 0%    90.3kB ± 0%  -0.13%  (p=0.035 n=20+19)
KV/Update/Native/rows=10-10    69.5kB ± 1%    69.3kB ± 1%    ~     (p=0.064 n=20+20)
KV/Delete/Native/rows=10-10    41.9kB ± 2%    41.8kB ± 1%    ~     (p=0.398 n=20+20)
KV/Delete/SQL/rows=10-10       84.0kB ± 0%    83.9kB ± 1%    ~     (p=0.128 n=17+18)

name                         old allocs/op  new allocs/op  delta
KV/Insert/Native/rows=1-10        116 ± 0%       115 ± 0%  -0.86%  (p=0.000 n=20+19)
KV/Delete/Native/rows=1-10        117 ± 0%       116 ± 0%  -0.85%  (p=0.000 n=20+19)
KV/Update/Native/rows=1-10        163 ± 0%       162 ± 0%  -0.84%  (p=0.000 n=19+17)
KV/Update/Native/rows=10-10       438 ± 1%       436 ± 1%  -0.43%  (p=0.031 n=19+20)
KV/Delete/Native/rows=10-10       249 ± 0%       248 ± 0%  -0.40%  (p=0.000 n=20+20)
KV/Insert/Native/rows=10-10       268 ± 0%       267 ± 0%  -0.37%  (p=0.000 n=20+20)
KV/Delete/SQL/rows=1-10           379 ± 0%       378 ± 0%  -0.32%  (p=0.000 n=20+19)
KV/Insert/SQL/rows=1-10           342 ± 0%       341 ± 0%  -0.29%  (p=0.000 n=20+20)
KV/Update/SQL/rows=1-10           477 ± 0%       476 ± 0%  -0.23%  (p=0.000 n=18+20)
KV/Update/SQL/rows=10-10          825 ± 0%       824 ± 1%  -0.21%  (p=0.014 n=19+20)
KV/Insert/SQL/rows=10-10          564 ± 0%       563 ± 0%  -0.13%  (p=0.001 n=12+18)
KV/Delete/SQL/rows=10-10          593 ± 0%       594 ± 1%    ~     (p=0.841 n=16+20)
```

Release note: None
Epic: None

91377: kv: combine heap allocs for placeholder EndTxnResponse on 1PC r=nvanbenschoten a=nvanbenschoten

This commit combines two of the heap allocations incurred by 1PC calls when constructing the placeholder EndTxnResponse in `evaluate1PC` into a single allocation.

```
name                         old time/op    new time/op    delta
KV/Insert/Native/rows=1-10     44.2µs ± 1%    44.1µs ± 2%    ~     (p=0.173 n=20+19)
KV/Insert/Native/rows=10-10    70.0µs ± 2%    70.2µs ± 2%    ~     (p=0.311 n=19+19)
KV/Insert/SQL/rows=1-10         125µs ± 1%     125µs ± 1%    ~     (p=0.736 n=18+19)
KV/Insert/SQL/rows=10-10        173µs ± 1%     173µs ± 2%    ~     (p=0.613 n=18+17)
KV/Update/Native/rows=1-10     67.2µs ± 2%    67.0µs ± 1%    ~     (p=0.158 n=20+19)
KV/Update/SQL/rows=1-10         165µs ± 1%     165µs ± 2%    ~     (p=0.967 n=19+20)
KV/Update/SQL/rows=10-10        328µs ± 5%     326µs ± 2%    ~     (p=0.327 n=19+18)
KV/Delete/Native/rows=1-10     45.8µs ± 1%    45.8µs ± 1%    ~     (p=0.339 n=17+18)
KV/Delete/Native/rows=10-10    83.2µs ± 1%    83.2µs ± 2%    ~     (p=0.954 n=19+19)
KV/Delete/SQL/rows=1-10         141µs ± 2%     141µs ± 3%    ~     (p=0.719 n=18+18)
KV/Delete/SQL/rows=10-10        208µs ± 6%     208µs ±11%    ~     (p=0.659 n=20+20)
KV/Update/Native/rows=10-10     162µs ± 3%     163µs ± 2%  +0.69%  (p=0.030 n=20+18)

name                         old alloc/op   new alloc/op   delta
KV/Insert/Native/rows=1-10     15.9kB ± 0%    15.9kB ± 0%    ~     (p=0.179 n=19+19)
KV/Insert/Native/rows=10-10    41.3kB ± 0%    41.3kB ± 0%    ~     (p=0.280 n=18+20)
KV/Insert/SQL/rows=1-10        43.6kB ± 0%    43.6kB ± 0%    ~     (p=0.164 n=19+20)
KV/Insert/SQL/rows=10-10       90.0kB ± 0%    90.0kB ± 1%    ~     (p=0.835 n=20+19)
KV/Update/Native/rows=1-10     22.5kB ± 0%    22.5kB ± 0%    ~     (p=0.324 n=18+19)
KV/Update/Native/rows=10-10    69.2kB ± 1%    69.2kB ± 1%    ~     (p=0.815 n=20+20)
KV/Update/SQL/rows=1-10        50.1kB ± 0%    50.0kB ± 0%    ~     (p=0.397 n=19+19)
KV/Update/SQL/rows=10-10        115kB ± 0%     115kB ± 1%    ~     (p=0.659 n=20+20)
KV/Delete/Native/rows=1-10     16.5kB ± 1%    16.5kB ± 1%    ~     (p=0.104 n=20+18)
KV/Delete/Native/rows=10-10    41.8kB ± 2%    41.8kB ± 2%    ~     (p=0.602 n=20+20)
KV/Delete/SQL/rows=1-10        51.2kB ± 0%    51.3kB ± 0%    ~     (p=0.941 n=20+20)
KV/Delete/SQL/rows=10-10       83.9kB ± 1%    83.9kB ± 1%    ~     (p=0.745 n=19+19)

name                         old allocs/op  new allocs/op  delta
KV/Insert/Native/rows=1-10        115 ± 0%       114 ± 0%  -0.87%  (p=0.000 n=20+20)
KV/Delete/Native/rows=1-10        116 ± 0%       115 ± 0%  -0.86%  (p=0.000 n=19+20)
KV/Update/Native/rows=1-10        162 ± 0%       161 ± 0%  -0.62%  (p=0.000 n=18+18)
KV/Delete/Native/rows=10-10       248 ± 0%       247 ± 0%  -0.40%  (p=0.000 n=19+19)
KV/Insert/Native/rows=10-10       267 ± 0%       266 ± 0%  -0.37%  (p=0.000 n=20+20)
KV/Insert/SQL/rows=1-10           340 ± 0%       339 ± 0%  -0.29%  (p=0.000 n=17+19)
KV/Delete/SQL/rows=1-10           377 ± 0%       376 ± 0%  -0.27%  (p=0.000 n=18+16)
KV/Delete/SQL/rows=10-10          592 ± 1%       590 ± 1%  -0.21%  (p=0.038 n=18+18)
KV/Update/SQL/rows=1-10           475 ± 0%       475 ± 0%  -0.16%  (p=0.001 n=20+19)
KV/Update/SQL/rows=10-10          824 ± 1%       823 ± 0%  -0.15%  (p=0.025 n=20+19)
KV/Insert/SQL/rows=10-10          562 ± 0%       562 ± 0%  -0.14%  (p=0.000 n=19+19)
KV/Update/Native/rows=10-10       435 ± 1%       435 ± 1%    ~     (p=0.708 n=20+20)
```

Release note: None
Epic: None

91689: sql: drop comment when drop database in legacy schema changer. r=chengxiong-ruan a=chengxiong-ruan

Looks like we accidentally drop the logic to delete database comment when we refactor metadata updater in cockroachdb#83592.

Epic: None.
Release note (sql change): Fixed a bug in legacy schema changer that comment was not dropped together with database.

Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
  • Loading branch information
4 people committed Nov 10, 2022
5 parents 9e98e4a + 841ae0f + 7155b6b + 7efb676 + acd2cfa commit 8e9b548
Show file tree
Hide file tree
Showing 12 changed files with 624 additions and 481 deletions.
1 change: 1 addition & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ retry
synthetic error: TransactionRetryWithProtoRefreshError: forced retry
epoch: 0 -> 1

reset
----
txn error cleared
txn id not changed

release x
----
0 <noignore>
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,8 +1273,14 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
func (tc *TxnCoordSender) PrepareRetryableError(ctx context.Context, msg string) error {
tc.mu.Lock()
defer tc.mu.Unlock()
return roachpb.NewTransactionRetryWithProtoRefreshError(
if tc.mu.txnState != txnPending {
return errors.AssertionFailedf("cannot set a retryable error. current state: %s", tc.mu.txnState)
}
pErr := roachpb.NewTransactionRetryWithProtoRefreshError(
msg, tc.mu.txn.ID, tc.mu.txn)
tc.mu.storedRetryableErr = pErr
tc.mu.txnState = txnRetryableError
return pErr
}

// Step is part of the TxnSender interface.
Expand Down
35 changes: 30 additions & 5 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -432,7 +433,8 @@ func (r *Replica) evaluateWriteBatch(
"Require1PC should not have gotten to transactional evaluation. ba: %s", ba.String())
}

ms := new(enginepb.MVCCStats)
ms := newMVCCStats()
defer releaseMVCCStats(ms)
rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot())
defer rec.Release()
batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes(
Expand Down Expand Up @@ -513,7 +515,8 @@ func (r *Replica) evaluate1PC(
etArg := arg.(*roachpb.EndTxnRequest)

// Evaluate strippedBa. If the transaction allows, permit refreshes.
ms := new(enginepb.MVCCStats)
ms := newMVCCStats()
defer releaseMVCCStats(ms)
if ba.CanForwardReadTimestamp {
batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, &strippedBa, g, st, ui, etArg.Deadline)
Expand Down Expand Up @@ -551,7 +554,7 @@ func (r *Replica) evaluate1PC(
clonedTxn.Status = roachpb.ABORTED
batch.Close()
batch = r.store.Engine().NewBatch()
ms = new(enginepb.MVCCStats)
ms.Reset()
} else {
// Run commit trigger manually.
innerResult, err := batcheval.RunCommitTrigger(ctx, rec, batch, ms, etArg, clonedTxn)
Expand Down Expand Up @@ -584,9 +587,18 @@ func (r *Replica) evaluate1PC(
}
}

// Add placeholder responses for end transaction requests.
br.Add(&roachpb.EndTxnResponse{OnePhaseCommit: true})
// Assign the response txn.
br.Txn = clonedTxn
// Add placeholder response for the end transaction request.
etAlloc := new(struct {
et roachpb.EndTxnResponse
union roachpb.ResponseUnion_EndTxn
})
etAlloc.et.OnePhaseCommit = true
etAlloc.union.EndTxn = &etAlloc.et
br.Responses = append(br.Responses, roachpb.ResponseUnion{})
br.Responses[len(br.Responses)-1].Value = &etAlloc.union

return onePCResult{
success: onePCSucceeded,
stats: *ms,
Expand Down Expand Up @@ -772,3 +784,16 @@ func isOnePhaseCommit(ba *roachpb.BatchRequest) bool {
// clean up.
return ba.Txn.Epoch == 0 || etArg.Require1PC
}

var mvccStatsPool = sync.Pool{
New: func() interface{} { return new(enginepb.MVCCStats) },
}

func newMVCCStats() *enginepb.MVCCStats {
return mvccStatsPool.Get().(*enginepb.MVCCStats)
}

func releaseMVCCStats(ms *enginepb.MVCCStats) {
ms.Reset()
mvccStatsPool.Put(ms)
}
124 changes: 124 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -578,3 +579,126 @@ func TestRevScanAndGet(t *testing.T) {
require.NoError(t, txn.Run(ctx, b))
}
}

// TestGenerateForcedRetryableErrorAfterRollback poisons the txn handle and then
// rolls it back. This should fail - a retryable error should not be returned
// after a rollback.
func TestGenerateForcedRetryableErrorAfterRollback(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
ts, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer ts.Stopper().Stop(ctx)
k, err := ts.ScratchRange()
require.NoError(t, err)
mkKey := func(s string) roachpb.Key {
return encoding.EncodeStringAscending(k[:len(k):len(k)], s)
}
checkKey := func(t *testing.T, s string, exp int64) {
got, err := db.Get(ctx, mkKey(s))
require.NoError(t, err)
require.Equal(t, exp, got.ValueInt())
}
var i int
txnErr := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
e1 := txn.Put(ctx, mkKey("a"), 1)
i++
require.LessOrEqual(t, i, 2)
if i == 1 {
require.NoError(t, e1)
// Prepare an error to return after the rollback.
retryErr := txn.GenerateForcedRetryableError(ctx, "force retry")
// Rolling back completes the transaction, returning an error is invalid.
require.NoError(t, txn.Rollback(ctx))
return retryErr
} else {
require.ErrorContains(t, e1, "TransactionStatusError", i)
return nil
}
})
require.ErrorContains(t, txnErr, "TransactionStatusError")
checkKey(t, "a", 0)
}

// TestGenerateForcedRetryableErrorSimple verifies the retryable func can return
// a forced retryable error, and then the retry will not see writes from the
// first run, and can succeed on the second run.
func TestGenerateForcedRetryableErrorSimple(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
ts, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer ts.Stopper().Stop(ctx)
k, err := ts.ScratchRange()
require.NoError(t, err)
mkKey := func(s string) roachpb.Key {
return encoding.EncodeStringAscending(k[:len(k):len(k)], s)
}
checkKey := func(t *testing.T, s string, exp int64) {
got, err := db.Get(ctx, mkKey(s))
require.NoError(t, err)
require.Equal(t, exp, got.ValueInt())
}
var i int
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
{
// Verify 'a' is not written yet.
got, err := txn.Get(ctx, mkKey("a"))
require.NoError(t, err)
require.False(t, got.Exists())
}
require.NoError(t, txn.Put(ctx, mkKey("a"), 1))
// Retry exactly once by propagating a retry error.
if i++; i == 1 {
return txn.GenerateForcedRetryableError(ctx, "force retry")
}
require.NoError(t, txn.Put(ctx, mkKey("b"), 2))
return nil
}))
checkKey(t, "a", 1)
checkKey(t, "b", 2)
}

// TestGenerateForcedRetryableErrorByPoisoning does the same as
// TestGenerateForcedRetryableErrorSimple above, but doesn't return the error
// from the retryable func, instead, this test verifies that the txn object is
// "poisoned" (in a state where the txn object cannot be used until the error is
// cleared) and it will be reset automatically and succeed on the second try.
func TestGenerateForcedRetryableErrorByPoisoning(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
ts, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer ts.Stopper().Stop(ctx)
k, err := ts.ScratchRange()
require.NoError(t, err)
mkKey := func(s string) roachpb.Key {
return encoding.EncodeStringAscending(k[:len(k):len(k)], s)
}
checkKey := func(t *testing.T, s string, exp int64) {
got, err := db.Get(ctx, mkKey(s))
require.NoError(t, err)
require.Equal(t, exp, got.ValueInt())
}
var i int
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
{
// Verify 'a' is not written yet.
got, err := txn.Get(ctx, mkKey("a"))
require.NoError(t, err, got.ValueInt())
require.False(t, got.Exists(), got.ValueInt())
}
require.NoError(t, txn.Put(ctx, mkKey("a"), 1))
// Retry exactly once by propagating a retry error.
if i++; i == 1 {
// Generate an error and then return nil (not the ideal implementation but
// should work).
_ = txn.GenerateForcedRetryableError(ctx, "force retry")
return nil
}
require.NoError(t, txn.Put(ctx, mkKey("b"), 2))
return nil
}))
checkKey(t, "a", 1)
checkKey(t, "b", 2)
}
Loading

0 comments on commit 8e9b548

Please sign in to comment.