Skip to content

Commit

Permalink
Merge #48297
Browse files Browse the repository at this point in the history
48297: kvclient: fiddle with assertions and rollbacks r=andreimatei a=andreimatei

See individual commits.

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed May 4, 2020
2 parents 3f8f452 + dd208c0 commit b8ecf62
Show file tree
Hide file tree
Showing 17 changed files with 404 additions and 127 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func checkInProgressBackupRestore(
var allowResponse chan struct{}
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse, *roachpb.ImportResponse:
Expand Down Expand Up @@ -3595,7 +3595,9 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse, *roachpb.ImportResponse:
Expand Down
104 changes: 7 additions & 97 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2883,98 +2883,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
}

type pushExpectation int

const (
// expectPusheeTxnRecovery means we're expecting transaction recovery to be
// performed (after finding a STAGING txn record).
expectPusheeTxnRecovery pushExpectation = iota
// expectPusheeTxnRecordNotFound means we're expecting the push to not find the
// pushee txn record.
expectPusheeTxnRecordNotFound
// dontExpectAnything means we're not going to check the state in which the
// pusher found the pushee's txn record.
dontExpectAnything
)

type expectedTxnResolution int

const (
expectAborted expectedTxnResolution = iota
expectCommitted
)

// checkPushResult pushes the specified txn and checks that the pushee's
// resolution is the expected one.
func checkPushResult(
ctx context.Context,
db *kv.DB,
txn roachpb.Transaction,
expResolution expectedTxnResolution,
pushExpectation pushExpectation,
) error {
pushReq := roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
},
PusheeTxn: txn.TxnMeta,
PushTo: hlc.Timestamp{},
PushType: roachpb.PUSH_ABORT,
// We're going to Force the push in order to not wait for the pushee to
// expire.
Force: true,
}
ba := roachpb.BatchRequest{}
ba.Add(&pushReq)

recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace")
defer cancel()

resp, pErr := db.NonTransactionalSender().Send(recCtx, ba)
if pErr != nil {
return pErr.GoError()
}

var statusErr error
pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status
switch pusheeStatus {
case roachpb.ABORTED:
if expResolution != expectAborted {
statusErr = errors.Errorf("transaction unexpectedly aborted")
}
case roachpb.COMMITTED:
if expResolution != expectCommitted {
statusErr = errors.Errorf("transaction unexpectedly committed")
}
default:
return errors.Errorf("unexpected txn status: %s", pusheeStatus)
}

// Verify that we're not fooling ourselves and that checking for the implicit
// commit actually caused the txn recovery procedure to run.
recording := collectRec()
var resolutionErr error
switch pushExpectation {
case expectPusheeTxnRecovery:
expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short())
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"recovery didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case expectPusheeTxnRecordNotFound:
expMsg := "pushee txn record not found"
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"push didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case dontExpectAnything:
}

return errors.CombineErrors(statusErr, resolutionErr)
}

// Test that, even though at the kvserver level requests are not idempotent
// across an EndTxn, a TxnCoordSender retry of the final batch after a refresh
// still works fine. We check that a transaction is not considered implicitly
Expand Down Expand Up @@ -3024,7 +2932,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
// WriteTooOldError on the first attempt.
sidePushedOnFirstAttempt side
sideRejectedOnSecondAttempt side
txnRecExpectation pushExpectation
txnRecExpectation kvclientutils.PushExpectation
}{
{
// On the first attempt, the left side succeeds in laying down an intent,
Expand All @@ -3051,7 +2959,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
// EndTxn), but fails. The 2nd attempt of the right side will no longer
// contain an EndTxn, as explained above. So we expect the txn record to
// not exist.
txnRecExpectation: expectPusheeTxnRecordNotFound,
txnRecExpectation: kvclientutils.ExpectPusheeTxnRecordNotFound,
},
{
// On the first attempt, the right side succeed in writing a STAGING txn
Expand All @@ -3070,7 +2978,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
sideRejectedOnSecondAttempt: right,
// The first attempt of the right side writes a STAGING txn record, so we
// expect to perform txn recovery.
txnRecExpectation: expectPusheeTxnRecovery,
txnRecExpectation: kvclientutils.ExpectPusheeTxnRecovery,
},
}

Expand Down Expand Up @@ -3151,7 +3059,9 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
})

require.Error(t, txn.CommitInBatch(ctx, b), "injected")
require.NoError(t, checkPushResult(ctx, db, *origTxn, expectAborted, tc.txnRecExpectation))
err = kvclientutils.CheckPushResult(
ctx, db, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
require.NoError(t, err)
})
}
}
Expand Down
39 changes: 25 additions & 14 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,26 +795,37 @@ func (tc *TxnCoordSender) updateStateLocked(

// Update our transaction with any information the error has.
if errTxn := pErr.GetTxn(); errTxn != nil {
// Sanity checks. Finalized transactions are not supposed to get here.
if errTxn.Status != roachpb.PENDING {
if errTxn.Status == roachpb.COMMITTED {
// Finding out about our transaction being committed indicates a serious
// bug. Requests are not supposed to be sent on transactions after they
// are committed.
log.Errorf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn)
} else if errTxn.Status == roachpb.ABORTED {
// We only expect TransactionAbortedError to carry an aborted txn. In
// particular, the heartbeater doesn't like running when the transaction
// is know to be aborted.
log.Fatalf(ctx, "unexpected error with ABORTED txn: (%T) %s. ba: %s. txn: %s.", pErr.GoError(), pErr, ba, errTxn)
}
if errTxn.Status == roachpb.COMMITTED {
sanityCheckCommittedErr(ctx, pErr, ba)
}

tc.mu.txn.Update(errTxn)
}
return pErr
}

// sanityCheckCommittedErr verifies the circumstances in which we're receiving
// an error indicating a COMMITTED transaction. Only rollbacks should be
// encountering such errors. Marking a transaction as explicitly-committed can
// also encounter these errors, but those errors don't make it to the
// TxnCoordSender.
func sanityCheckCommittedErr(ctx context.Context, pErr *roachpb.Error, ba roachpb.BatchRequest) {
errTxn := pErr.GetTxn()
if errTxn == nil || errTxn.Status != roachpb.COMMITTED {
// We shouldn't have been called.
return
}
// The only case in which an error can have a COMMITTED transaction in it is
// when the request was a rollback. Rollbacks can race with commits if a
// context timeout expires while a commit request is in flight.
if ba.IsSingleAbortTxnRequest() {
return
}
// Finding out about our transaction being committed indicates a serious bug.
// Requests are not supposed to be sent on transactions after they are
// committed.
log.Fatalf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn)
}

// setTxnAnchorKey sets the key at which to anchor the transaction record. The
// transaction anchor key defaults to the first key written in a transaction.
func (tc *TxnCoordSender) setTxnAnchorKeyLocked(key roachpb.Key) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) {
key := roachpb.Key("a")
are := roachpb.NewAmbiguousResultError("very ambiguous")
knobs := &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, req := range ba.Requests {
if putReq, ok := req.GetInner().(*roachpb.PutRequest); ok && putReq.Key.Equal(key) {
return roachpb.NewError(are)
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,12 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
}

if h.mu.txn.Status != roachpb.PENDING {
log.Fatalf(ctx,
"txn committed or aborted but heartbeat loop hasn't been signaled to stop. txn: %s",
h.mu.txn)
if h.mu.txn.Status == roachpb.COMMITTED {
log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)
}
// If the transaction is aborted, there's no point in heartbeating. The
// client needs to send a rollback.
return false
}

// Clone the txn in order to put it in the heartbeat request.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) {

var mtc *multiTestContext
storeCfg.TestingKnobs.TestingResponseFilter = func(
ba roachpb.BatchRequest, _ *roachpb.BatchResponse,
ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse,
) *roachpb.Error {
del := ba.Requests[0].GetDelete()
if del != nil && bytes.HasSuffix(del.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 {
Expand Down Expand Up @@ -2777,7 +2777,9 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) {
// This verifies that we're actually testing what we claim to.
var sawMeta2Req int64
meta2CKey := keys.RangeMetaKey(cKey).AsRawKey()
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
storeCfg.TestingKnobs.TestingResponseFilter = func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
for i, req := range ba.Requests {
if g := req.GetGet(); g != nil && g.Key.Equal(meta2CKey) && br.Responses[i].GetGet().Value == nil {
atomic.StoreInt64(&sawMeta2Req, 1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ func TestAdminRelocateRangeSafety(t *testing.T) {
var useSeenAdd atomic.Value
useSeenAdd.Store(false)
seenAdd := make(chan struct{}, 1)
responseFilter := func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
responseFilter := func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if ba.IsSingleRequest() {
changeReplicas, ok := ba.Requests[0].GetInner().(*roachpb.AdminChangeReplicasRequest)
if ok && changeReplicas.Changes()[0].ChangeType == roachpb.ADD_REPLICA && useSeenAdd.Load().(bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2813,7 +2813,7 @@ func TestStoreSplitRangeLookupRace(t *testing.T) {
blockedRangeLookups := int32(0)
rangeLookupIsBlocked := make(chan struct{}, 1)
unblockRangeLookups := make(chan struct{})
respFilter := func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
respFilter := func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
select {
case <-blockRangeLookups:
if kv.TestingIsRangeLookup(ba) &&
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ func TestCheckConsistencyReplay(t *testing.T) {
}

// Arrange to trigger a retry when a ComputeChecksum request arrives.
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
storeCfg.TestingKnobs.TestingResponseFilter = func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
state.Lock()
defer state.Unlock()
if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *Replica) sendWithRangeID(
log.Eventf(ctx, "replica.Send got error: %s", pErr)
} else {
if filter := r.store.cfg.TestingKnobs.TestingResponseFilter; filter != nil {
pErr = filter(*ba, br)
pErr = filter(ctx, *ba, br)
}
}
return br, pErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/storagebase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error)
// ReplicaResponseFilter is used in unittests to modify the outbound
// response returned to a waiting client after a replica command has
// been processed. This filter is invoked only by the command proposer.
type ReplicaResponseFilter func(roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error
type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error

// ContainsKey returns whether this range contains the specified key.
func ContainsKey(desc *roachpb.RangeDescriptor, key roachpb.Key) bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func init() {
Expand All @@ -26,6 +27,7 @@ func init() {

func TestMain(m *testing.M) {
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

Expand Down
Loading

0 comments on commit b8ecf62

Please sign in to comment.