Skip to content

Commit

Permalink
kv: add external testing of parent-child interactions
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
ajwerner committed Mar 27, 2023
1 parent fba2a5c commit fd39881
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 23 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
"//pkg/storage/enginepb",
"//pkg/testutils",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ go_library(
"//pkg/util/future",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/interval",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
)
Expand Down Expand Up @@ -304,12 +303,8 @@ func (r *Replica) canDropLatchesBeforeEval(
for _, req := range ba.Requests {
reqHeader := req.GetInner().Header()
start, end := reqHeader.Key, reqHeader.EndKey
var txnID uuid.UUID
if ba.Txn != nil {
txnID = ba.Txn.ID
}
needsIntentInterleavingForThisRequest, err := storage.ScanConflictingIntentsForDroppingLatchesEarly(
ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxIntents,
ctx, rw, ba.Txn, ba.Header.Timestamp, start, end, &intents, maxIntents,
)
if err != nil {
return false /* ok */, true /* stillNeedsIntentInterleaving */, kvpb.NewError(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var optimisticEvalLimitedScans = settings.RegisterBoolSetting(
// │ │
// ▼ │
//
// executeWriteBatch
// executeWriteBatch │
//
// │ │
// ▼ ▼
Expand Down
233 changes: 231 additions & 2 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -916,8 +921,7 @@ func TestChildTransactionPushesParentToCommitTimestamp(t *testing.T) {
before := txn.ProvisionalCommitTimestamp()
toPushTo := before.Next().Next()
require.NoError(t, txn.ChildTxn(ctx, func(ctx context.Context, childTxn *kv.Txn) error {
childTxn.SetFixedTimestamp(ctx, toPushTo)
return nil
return childTxn.SetFixedTimestamp(ctx, toPushTo)
}))
require.Equal(t, toPushTo, txn.ProvisionalCommitTimestamp())
}
Expand Down Expand Up @@ -1026,3 +1030,228 @@ func TestChildTransactionDeadlockDetection(t *testing.T) {
t.Fatalf("unexpected outcome: a=%s, b=%s", strA, strB)
}
}

// TestChildTxnSelfInteractions is an integration-style test of child
// transaction interactions with parent state.
func TestChildTxnSelfInteractions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
type testCase struct {
name string
f func(t *testing.T, db *kv.DB)
}
run := func(test testCase) {
t.Run(test.name, func(t *testing.T) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
db := tc.Server(0).DB()
test.f(t, db)
})
}
testCases := []testCase{
{
"child reads parent's write",
func(t *testing.T, db *kv.DB) {
require.NoError(t, db.Txn(ctx, func(
ctx context.Context, txn *kv.Txn,
) error {
require.NoError(t, txn.Put(ctx, "foo", "bar"))
return txn.ChildTxn(ctx, func(
ctx context.Context, childTxn *kv.Txn,
) error {
got, err := childTxn.Get(ctx, "foo")
require.NoError(t, err)
gotBytes, err := got.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, string(gotBytes), "bar")
return nil
})
}))
},
},
{
"child write-write conflict gets an error",
func(t *testing.T, db *kv.DB) {
require.Regexp(t,
`descendant transaction \w+ attempted to write over ancestor \w+ at key "foo"`,
db.Txn(ctx, func(
ctx context.Context, txn *kv.Txn,
) error {
require.NoError(t, txn.Put(ctx, "foo", "bar"))

// This attempt to write over the parent's intent will fail.
return txn.ChildTxn(ctx, func(
ctx context.Context, childTxn *kv.Txn,
) error {
err := childTxn.Put(ctx, "foo", "baz")
return err
})
}))
},
},
{

// In this case the child should pass through the read lock of the parent,
// write successfully, and then it should push the parent which will then
// be forced to refresh.
"child read-write conflict forces parent to get an error (locking)",
func(t *testing.T, db *kv.DB) {
k := roachpb.Key("foo")
require.NoError(t, db.Put(ctx, k, "bar"))
require.Regexp(t,
`cannot forward provisional commit timestamp due to overlapping write`,
db.Txn(ctx, func(
ctx context.Context, txn *kv.Txn,
) error {
scan := &kvpb.ScanRequest{
KeyLocking: lock.Exclusive,
}
scan.Key = k
scan.EndKey = k.PrefixEnd()
b := txn.NewBatch()
b.AddRawRequest(scan)
require.NoError(t, txn.Run(ctx, b))

// The below write will succeed but the forwarding of the parent above
// the write's timestamp will fail as it detects that the parent's
// read has been invalidated.
return txn.ChildTxn(ctx, func(
ctx context.Context, childTxn *kv.Txn,
) error {
err := childTxn.Put(ctx, k, "baz")
require.NoError(t, err)
return nil
})
}))
},
},
}
for _, tc := range testCases {
run(tc)
}
}

// TestRestartDueToAbortedParentTransactionDetectedByAncestor works by ensuring
// that the auto-retry facilities of the SQL layer interact with the retry
// detected due to a child detecting the parent having been aborted propagates
// properly and leads to an automatic retry.
func TestRestartDueToAbortedParentTransactionDetectedByAncestor(t *testing.T) {
defer leaktest.AfterTest(t)()

// The basic idea is that after the transaction runs its first write, the
// knob will inject control flow into the test before the prepare of the
// second write. At that point, a separate transaction with a higher priority
// will write to a scratch key and then get blocked on the first write.
//
// Then, a child transaction on behalf of this sql transaction will run and
// attempt to write over the scratch key. Deadlock detection will fire and
// the other transaction will abort the SQL transaction. The child will
// detect this abort (test asserts that this error is received) and will
// turn it into a restart. At that point, the other transaction will be
// allowed to proceed and commit and the sql transaction will be allowed to
// restart and succeed.
const (
createTable = "CREATE TABLE foo (i INT PRIMARY KEY, j INT)"
firstWrite = "UPSERT INTO foo VALUES (1, 2)"
secondWrite = "INSERT INTO foo VALUES (2, 2)"
txn = "BEGIN;" +
" " + firstWrite + ";" +
// Use prepare because it has a handy testing knob which provides access
// to the underlying transaction.
" PREPARE second AS " + secondWrite + ";" +
" EXECUTE second; " +
" COMMIT;"
)
blockCh := make(chan chan error, 1)
var interceptedTxn *kv.Txn
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
BeforePrepare: func(ctx context.Context, stmt string, txn *kv.Txn) error {
if stmt == secondWrite {
interceptedTxn = txn
ch := make(chan error)
select {
case blockCh <- ch: // will set to nil after the first pass
return <-ch
default:
}
}
return nil
},
},
},
},
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
scratch := tc.ScratchRange(t)

sqlDB := tc.ServerConn(0)
tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, createTable)
var fooTableID descpb.ID
tdb.QueryRow(t, "SELECT $1::regclass::int", "foo").Scan(&fooTableID)

db := tc.Server(0).DB()
codec := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).Codec

errCh := make(chan error, 1)
go func() {
_, err := tc.ServerConn(0).Exec(txn)
errCh <- err
}()

unblock := <-blockCh

// We want to create a deadlock situation where our new transaction has a
// higher priority than the parent (and its descendants).
txnProto := roachpb.MakeTransaction(
"test", nil, roachpb.NormalUserPriority, db.Clock().Now(),
db.Clock().MaxOffset().Nanoseconds(), 1)
txnProto.Priority = interceptedTxn.TestingCloneTxn().Priority + 1
otherTxn := kv.NewTxnFromProto(
ctx, db, tc.Server(0).NodeID(),
txnProto.ReadTimestamp.UnsafeToClockTimestamp(), kv.RootTxn, &txnProto)

// Write to scratch with otherTxn, then attempt to write over the write by
// the mainTxn.
require.NoError(t, otherTxn.Put(ctx, scratch, "bar"))

indexKeyPrefix, err := keyside.Encode(
codec.IndexPrefix(uint32(fooTableID), 1),
tree.NewDInt(1),
encoding.Ascending,
)
require.NoError(t, err)
key := keys.MakeFamilyKey(indexKeyPrefix, 0)

otherWriteErrCh := make(chan error)
go func() {
otherWriteErrCh <- otherTxn.Put(ctx, key, 1)
}()
select {
case err := <-otherWriteErrCh:
t.Fatal(err)
case <-time.After(time.Millisecond):
}

err = interceptedTxn.ChildTxn(ctx, func(_ context.Context, childTxn *kv.Txn) error {
err := childTxn.Put(ctx, scratch, "baz")
require.Truef(t, errors.HasType(err, (*kvpb.AncestorAbortedError)(nil)), "%T", err)
return err
})
if retry := (*kvpb.TransactionRetryWithProtoRefreshError)(nil); !errors.As(err, &retry) {
t.Fatalf("expected %T, got %T",
(*kvpb.TransactionRetryWithProtoRefreshError)(nil), err)
} else {
require.Equal(t, interceptedTxn.ID(), retry.TxnID)
}
blockCh = nil
unblock <- err
require.NoError(t, <-otherWriteErrCh)
require.NoError(t, otherTxn.Commit(ctx))
require.NoError(t, <-errCh)
}
4 changes: 2 additions & 2 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error {
func ScanConflictingIntentsForDroppingLatchesEarly(
ctx context.Context,
reader Reader,
txnID uuid.UUID,
txn *roachpb.Transaction,
ts hlc.Timestamp,
start, end roachpb.Key,
intents *[]roachpb.Intent,
Expand Down Expand Up @@ -1788,7 +1788,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly(
if meta.Txn == nil {
return false, errors.Errorf("intent without transaction")
}
ownIntent := txnID != uuid.Nil && txnID == meta.Txn.ID
ownIntent, _, _, _ := ownsIntent(txn, &meta)
if ownIntent {
// If we ran into one of our own intents, a corresponding scan over the
// MVCC keyspace will need access to the key's intent history in order to
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarly(t *testing.T) {
needsIntentHistory, err := ScanConflictingIntentsForDroppingLatchesEarly(
ctx,
eng,
txn.ID,
txn,
txn.ReadTimestamp,
tc.start,
tc.end,
Expand Down Expand Up @@ -2245,7 +2245,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarlyReadYourOwnWrites(t *testi
needsIntentHistory, err := ScanConflictingIntentsForDroppingLatchesEarly(
ctx,
eng,
txn.ID,
txn,
txn.ReadTimestamp,
keyA,
nil,
Expand Down
15 changes: 6 additions & 9 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ func (p *pebbleMVCCScanner) getOne(ctx context.Context) (ok, added bool) {
prevTS = metaTS.Prev()
}

ownIntent, epoch, seqNum, ignoredSeqNums := p.ownsIntent()
ownIntent, epoch, seqNum, ignoredSeqNums := ownsIntent(p.txn, &p.meta)
if !ownIntent {
conflictingIntent := metaTS.LessEq(p.ts) || p.failOnMoreRecent
if !conflictingIntent {
Expand Down Expand Up @@ -1075,14 +1075,11 @@ func (p *pebbleMVCCScanner) getOne(ctx context.Context) (ok, added bool) {
// and ignored sequence range for the intent owner are additionally returned.
// Note that these values may correspond to an ancestor transaction rather than
// the transaction performing the read.
func (p *pebbleMVCCScanner) ownsIntent() (
bool,
enginepb.TxnEpoch,
enginepb.TxnSeq,
[]enginepb.IgnoredSeqNumRange,
) {
for cur := p.txn; cur != nil; cur = cur.Parent {
if p.meta.Txn.ID.Equal(cur.ID) {
func ownsIntent(
root *roachpb.Transaction, intent *enginepb.MVCCMetadata,
) (bool, enginepb.TxnEpoch, enginepb.TxnSeq, []enginepb.IgnoredSeqNumRange) {
for cur := root; cur != nil; cur = cur.Parent {
if intent.Txn.ID.Equal(cur.ID) {
return true, cur.Epoch, cur.Sequence, cur.IgnoredSeqNums
}
}
Expand Down

0 comments on commit fd39881

Please sign in to comment.