diff --git a/pkg/kv/integration_test.go b/pkg/kv/integration_test.go index 858445cc25c4..4241e0891c76 100644 --- a/pkg/kv/integration_test.go +++ b/pkg/kv/integration_test.go @@ -16,12 +16,15 @@ package kv_test import ( "context" + "fmt" "regexp" "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -32,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // This file contains contains integration tests that don't fit anywhere else. @@ -185,3 +189,127 @@ func TestDelayedBeginRetryable(t *testing.T) { t.Fatalf("expected %s, got: %s", exp, pErr) } } + +// Test that waiters on transactions whose commit command is rejected see the +// transaction as Aborted. This test is a regression test for #30792 which was +// causing pushers in the txn wait queue to consider such a transaction +// committed. +func TestWaiterOnRejectedCommit(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + // The txn id whose commit we're going to reject. A uuid.UUID. + var txnID atomic.Value + // The EndTransaction proposal that we want to reject. A string. + var commitCmdID atomic.Value + readerBlocked := make(chan struct{}) + // txnUpdate is signaled once the txn wait queue is updated for our + // transaction. Normally it only needs a buffer length of 1, but bugs that + // cause it to be pinged several times (e.g. #30792) might need a bigger + // buffer to avoid the test timing out. + txnUpdate := make(chan roachpb.TransactionStatus, 10) + + s, _, db := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{ + TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error { + // We'll recognize the attempt to commit our transaction and store the + // respective command id. + ba := args.Req + etReq, ok := ba.GetArg(roachpb.EndTransaction) + if !ok { + return nil + } + if !etReq.(*roachpb.EndTransactionRequest).Commit { + return nil + } + v := txnID.Load() + if v == nil { + return nil + } + if !ba.Txn.ID.Equal(v.(uuid.UUID)) { + return nil + } + commitCmdID.Store(args.CmdID) + return nil + }, + TestingApplyFilter: func(args storagebase.ApplyFilterArgs) *roachpb.Error { + // We'll trap the processing of the commit command and return an error + // for it. + v := commitCmdID.Load() + if v == nil { + return nil + } + cmdID := v.(storagebase.CmdIDKey) + if args.CmdID == cmdID { + return roachpb.NewErrorf("test injected err") + } + return nil + }, + TxnWait: txnwait.TestingKnobs{ + OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) { + // We'll trap a reader entering the wait queue for our txn. + v := txnID.Load() + if v == nil { + return + } + if push.PusheeTxn.ID.Equal(v.(uuid.UUID)) { + close(readerBlocked) + } + }, + OnTxnUpdate: func(ctx context.Context, txn *roachpb.Transaction) { + // We'll trap updates to our txn. + v := txnID.Load() + if v == nil { + return + } + if txn.ID.Equal(v.(uuid.UUID)) { + txnUpdate <- txn.Status + } + }, + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + // We'll start a transaction, write an intent, then separately do a read on a + // different goroutine and wait for that read to block on the intent, then + // we'll attempt to commit the transaction but we'll intercept the processing + // of the commit command and reject it. + // Then we'll assert that the txn wait queue is told that the transaction + // aborted, and we also check that the reader got a nil value. + + txn := client.NewTxn(ctx, db, s.NodeID(), client.RootTxn) + key := "key" + if err := txn.Put(ctx, key, "val"); err != nil { + t.Fatal(err) + } + txnID.Store(txn.ID()) + + readerDone := make(chan error, 1) + + go func() { + val, err := db.Get(ctx, key) + if err != nil { + readerDone <- err + } + if val.Exists() { + readerDone <- fmt.Errorf("expected value to not exist, got: %s", val) + } + close(readerDone) + }() + + // Wait for the reader to enter the txn wait queue. + <-readerBlocked + if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") { + t.Fatalf("expected injected err, got: %v", err) + } + // Wait for the txn wait queue to be pinged and check the status. + if status := <-txnUpdate; status != roachpb.ABORTED { + t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status) + } + if err := <-readerDone; err != nil { + t.Fatal(err) + } +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ca24524d17a4..51a6c6733fc5 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -4469,6 +4469,11 @@ func (s *Store) setScannerActive(active bool) { s.scanner.SetDisabled(!active) } +// GetTxnWaitKnobs is part of txnwait.StoreInterface. +func (s *Store) GetTxnWaitKnobs() txnwait.TestingKnobs { + return s.TestingKnobs().TxnWait +} + func init() { tracing.RegisterTagRemapping("s", "store") } diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 857e5c137ea3..f4944f119669 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -173,6 +174,8 @@ type StoreTestingKnobs struct { SystemLogsGCPeriod time.Duration // SystemLogsGCGCDone is used to notify when system logs GC is done. SystemLogsGCGCDone chan<- struct{} + // TxnWait contains knobs for txnwait.Queue instances. + TxnWait txnwait.TestingKnobs } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index 69366b197550..cfad2e0f17a1 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -136,6 +136,7 @@ type StoreInterface interface { Clock() *hlc.Clock Stopper() *stop.Stopper DB() *client.DB + GetTxnWaitKnobs() TestingKnobs } // ReplicaInterface provides some parts of a Replica without incurring a dependency. @@ -143,6 +144,14 @@ type ReplicaInterface interface { ContainsKey(roachpb.Key) bool } +// TestingKnobs represents testing knobs for a Queue. +type TestingKnobs struct { + // OnTxnWaitEnqueue is called when a would-be pusher joins a wait queue. + OnPusherBlocked func(ctx context.Context, push *roachpb.PushTxnRequest) + // OnTxnUpdate is called by Queue.UpdateTxn. + OnTxnUpdate func(ctx context.Context, txn *roachpb.Transaction) +} + // Queue enqueues PushTxn requests which are waiting on extant txns // with conflicting intents to abort or commit. // @@ -269,6 +278,9 @@ func (q *Queue) Enqueue(txn *roachpb.Transaction) { func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction) { txn.AssertInitialized(ctx) q.mu.Lock() + if f := q.store.GetTxnWaitKnobs().OnTxnUpdate; f != nil { + f(ctx, txn) + } q.releaseWaitingQueriesLocked(ctx, txn.ID) @@ -399,6 +411,9 @@ func (q *Queue) MaybeWaitForPush( pending: make(chan *roachpb.Transaction, 1), } pending.waitingPushes = append(pending.waitingPushes, push) + if f := q.store.GetTxnWaitKnobs().OnPusherBlocked; f != nil { + f(ctx, req) + } // Because we're adding another dependent on the pending // transaction, send on the waiting queries' channel to // indicate there is a new dependent and they should proceed