Skip to content

Commit

Permalink
storage: add a test for interactions between rejected commands and re…
Browse files Browse the repository at this point in the history
…aders

Another test for cockroachdb#30792, this time testing the specific interaction
between readers and rejected Raft commands.

Release note: None
  • Loading branch information
andreimatei committed Nov 13, 2018
1 parent 728dd55 commit ef2549b
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 0 deletions.
128 changes: 128 additions & 0 deletions pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package kv_test

import (
"context"
"fmt"
"regexp"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// This file contains contains integration tests that don't fit anywhere else.
Expand Down Expand Up @@ -185,3 +189,127 @@ func TestDelayedBeginRetryable(t *testing.T) {
t.Fatalf("expected %s, got: %s", exp, pErr)
}
}

// Test that waiters on transactions whose commit command is rejected see the
// transaction as Aborted. This test is a regression test for #30792 which was
// causing pushers in the txn wait queue to consider such a transaction
// committed.
func TestWaiterOnRejectedCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

// The txn id whose commit we're going to reject. A uuid.UUID.
var txnID atomic.Value
// The EndTransaction proposal that we want to reject. A string.
var commitCmdID atomic.Value
readerBlocked := make(chan struct{})
// txnUpdate is signaled once the txn wait queue is updated for our
// transaction. Normally it only needs a buffer length of 1, but bugs that
// cause it to be pinged several times (e.g. #30792) might need a bigger
// buffer to avoid the test timing out.
txnUpdate := make(chan roachpb.TransactionStatus, 10)

s, _, db := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
// We'll recognize the attempt to commit our transaction and store the
// respective command id.
ba := args.Req
etReq, ok := ba.GetArg(roachpb.EndTransaction)
if !ok {
return nil
}
if !etReq.(*roachpb.EndTransactionRequest).Commit {
return nil
}
v := txnID.Load()
if v == nil {
return nil
}
if !ba.Txn.ID.Equal(v.(uuid.UUID)) {
return nil
}
commitCmdID.Store(args.CmdID)
return nil
},
TestingApplyFilter: func(args storagebase.ApplyFilterArgs) *roachpb.Error {
// We'll trap the processing of the commit command and return an error
// for it.
v := commitCmdID.Load()
if v == nil {
return nil
}
cmdID := v.(storagebase.CmdIDKey)
if args.CmdID == cmdID {
return roachpb.NewErrorf("test injected err")
}
return nil
},
TxnWait: txnwait.TestingKnobs{
OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) {
// We'll trap a reader entering the wait queue for our txn.
v := txnID.Load()
if v == nil {
return
}
if push.PusheeTxn.ID.Equal(v.(uuid.UUID)) {
close(readerBlocked)
}
},
OnTxnUpdate: func(ctx context.Context, txn *roachpb.Transaction) {
// We'll trap updates to our txn.
v := txnID.Load()
if v == nil {
return
}
if txn.ID.Equal(v.(uuid.UUID)) {
txnUpdate <- txn.Status
}
},
},
},
},
})
defer s.Stopper().Stop(ctx)

// We'll start a transaction, write an intent, then separately do a read on a
// different goroutine and wait for that read to block on the intent, then
// we'll attempt to commit the transaction but we'll intercept the processing
// of the commit command and reject it.
// Then we'll assert that the txn wait queue is told that the transaction
// aborted, and we also check that the reader got a nil value.

txn := client.NewTxn(ctx, db, s.NodeID(), client.RootTxn)
key := "key"
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
}
txnID.Store(txn.ID())

readerDone := make(chan error, 1)

go func() {
val, err := db.Get(ctx, key)
if err != nil {
readerDone <- err
}
if val.Exists() {
readerDone <- fmt.Errorf("expected value to not exist, got: %s", val)
}
close(readerDone)
}()

// Wait for the reader to enter the txn wait queue.
<-readerBlocked
if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") {
t.Fatalf("expected injected err, got: %v", err)
}
// Wait for the txn wait queue to be pinged and check the status.
if status := <-txnUpdate; status != roachpb.ABORTED {
t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status)
}
if err := <-readerDone; err != nil {
t.Fatal(err)
}
}
5 changes: 5 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4469,6 +4469,11 @@ func (s *Store) setScannerActive(active bool) {
s.scanner.SetDisabled(!active)
}

// GetTxnWaitKnobs is part of txnwait.StoreInterface.
func (s *Store) GetTxnWaitKnobs() txnwait.TestingKnobs {
return s.TestingKnobs().TxnWait
}

func init() {
tracing.RegisterTagRemapping("s", "store")
}
3 changes: 3 additions & 0 deletions pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand Down Expand Up @@ -173,6 +174,8 @@ type StoreTestingKnobs struct {
SystemLogsGCPeriod time.Duration
// SystemLogsGCGCDone is used to notify when system logs GC is done.
SystemLogsGCGCDone chan<- struct{}
// TxnWait contains knobs for txnwait.Queue instances.
TxnWait txnwait.TestingKnobs
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/txnwait/txnqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,22 @@ type StoreInterface interface {
Clock() *hlc.Clock
Stopper() *stop.Stopper
DB() *client.DB
GetTxnWaitKnobs() TestingKnobs
}

// ReplicaInterface provides some parts of a Replica without incurring a dependency.
type ReplicaInterface interface {
ContainsKey(roachpb.Key) bool
}

// TestingKnobs represents testing knobs for a Queue.
type TestingKnobs struct {
// OnTxnWaitEnqueue is called when a would-be pusher joins a wait queue.
OnPusherBlocked func(ctx context.Context, push *roachpb.PushTxnRequest)
// OnTxnUpdate is called by Queue.UpdateTxn.
OnTxnUpdate func(ctx context.Context, txn *roachpb.Transaction)
}

// Queue enqueues PushTxn requests which are waiting on extant txns
// with conflicting intents to abort or commit.
//
Expand Down Expand Up @@ -269,6 +278,9 @@ func (q *Queue) Enqueue(txn *roachpb.Transaction) {
func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction) {
txn.AssertInitialized(ctx)
q.mu.Lock()
if f := q.store.GetTxnWaitKnobs().OnTxnUpdate; f != nil {
f(ctx, txn)
}

q.releaseWaitingQueriesLocked(ctx, txn.ID)

Expand Down Expand Up @@ -399,6 +411,9 @@ func (q *Queue) MaybeWaitForPush(
pending: make(chan *roachpb.Transaction, 1),
}
pending.waitingPushes = append(pending.waitingPushes, push)
if f := q.store.GetTxnWaitKnobs().OnPusherBlocked; f != nil {
f(ctx, req)
}
// Because we're adding another dependent on the pending
// transaction, send on the waiting queries' channel to
// indicate there is a new dependent and they should proceed
Expand Down

0 comments on commit ef2549b

Please sign in to comment.