Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: fiddle with assertions and rollbacks #48297

Merged
merged 5 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func checkInProgressBackupRestore(
var allowResponse chan struct{}
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse, *roachpb.ImportResponse:
Expand Down Expand Up @@ -3595,7 +3595,9 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse, *roachpb.ImportResponse:
Expand Down
104 changes: 7 additions & 97 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2883,98 +2883,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
}

type pushExpectation int

const (
// expectPusheeTxnRecovery means we're expecting transaction recovery to be
// performed (after finding a STAGING txn record).
expectPusheeTxnRecovery pushExpectation = iota
// expectPusheeTxnRecordNotFound means we're expecting the push to not find the
// pushee txn record.
expectPusheeTxnRecordNotFound
// dontExpectAnything means we're not going to check the state in which the
// pusher found the pushee's txn record.
dontExpectAnything
)

type expectedTxnResolution int

const (
expectAborted expectedTxnResolution = iota
expectCommitted
)

// checkPushResult pushes the specified txn and checks that the pushee's
// resolution is the expected one.
func checkPushResult(
ctx context.Context,
db *kv.DB,
txn roachpb.Transaction,
expResolution expectedTxnResolution,
pushExpectation pushExpectation,
) error {
pushReq := roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
},
PusheeTxn: txn.TxnMeta,
PushTo: hlc.Timestamp{},
PushType: roachpb.PUSH_ABORT,
// We're going to Force the push in order to not wait for the pushee to
// expire.
Force: true,
}
ba := roachpb.BatchRequest{}
ba.Add(&pushReq)

recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace")
defer cancel()

resp, pErr := db.NonTransactionalSender().Send(recCtx, ba)
if pErr != nil {
return pErr.GoError()
}

var statusErr error
pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status
switch pusheeStatus {
case roachpb.ABORTED:
if expResolution != expectAborted {
statusErr = errors.Errorf("transaction unexpectedly aborted")
}
case roachpb.COMMITTED:
if expResolution != expectCommitted {
statusErr = errors.Errorf("transaction unexpectedly committed")
}
default:
return errors.Errorf("unexpected txn status: %s", pusheeStatus)
}

// Verify that we're not fooling ourselves and that checking for the implicit
// commit actually caused the txn recovery procedure to run.
recording := collectRec()
var resolutionErr error
switch pushExpectation {
case expectPusheeTxnRecovery:
expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short())
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"recovery didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case expectPusheeTxnRecordNotFound:
expMsg := "pushee txn record not found"
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"push didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case dontExpectAnything:
}

return errors.CombineErrors(statusErr, resolutionErr)
}

// Test that, even though at the kvserver level requests are not idempotent
// across an EndTxn, a TxnCoordSender retry of the final batch after a refresh
// still works fine. We check that a transaction is not considered implicitly
Expand Down Expand Up @@ -3024,7 +2932,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
// WriteTooOldError on the first attempt.
sidePushedOnFirstAttempt side
sideRejectedOnSecondAttempt side
txnRecExpectation pushExpectation
txnRecExpectation kvclientutils.PushExpectation
}{
{
// On the first attempt, the left side succeeds in laying down an intent,
Expand All @@ -3051,7 +2959,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
// EndTxn), but fails. The 2nd attempt of the right side will no longer
// contain an EndTxn, as explained above. So we expect the txn record to
// not exist.
txnRecExpectation: expectPusheeTxnRecordNotFound,
txnRecExpectation: kvclientutils.ExpectPusheeTxnRecordNotFound,
},
{
// On the first attempt, the right side succeed in writing a STAGING txn
Expand All @@ -3070,7 +2978,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
sideRejectedOnSecondAttempt: right,
// The first attempt of the right side writes a STAGING txn record, so we
// expect to perform txn recovery.
txnRecExpectation: expectPusheeTxnRecovery,
txnRecExpectation: kvclientutils.ExpectPusheeTxnRecovery,
},
}

Expand Down Expand Up @@ -3151,7 +3059,9 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
})

require.Error(t, txn.CommitInBatch(ctx, b), "injected")
require.NoError(t, checkPushResult(ctx, db, *origTxn, expectAborted, tc.txnRecExpectation))
err = kvclientutils.CheckPushResult(
ctx, db, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
require.NoError(t, err)
})
}
}
Expand Down
39 changes: 25 additions & 14 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,26 +795,37 @@ func (tc *TxnCoordSender) updateStateLocked(

// Update our transaction with any information the error has.
if errTxn := pErr.GetTxn(); errTxn != nil {
// Sanity checks. Finalized transactions are not supposed to get here.
if errTxn.Status != roachpb.PENDING {
if errTxn.Status == roachpb.COMMITTED {
// Finding out about our transaction being committed indicates a serious
// bug. Requests are not supposed to be sent on transactions after they
// are committed.
log.Errorf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn)
} else if errTxn.Status == roachpb.ABORTED {
// We only expect TransactionAbortedError to carry an aborted txn. In
// particular, the heartbeater doesn't like running when the transaction
// is know to be aborted.
log.Fatalf(ctx, "unexpected error with ABORTED txn: (%T) %s. ba: %s. txn: %s.", pErr.GoError(), pErr, ba, errTxn)
}
if errTxn.Status == roachpb.COMMITTED {
sanityCheckCommittedErr(ctx, pErr, ba)
}

tc.mu.txn.Update(errTxn)
}
return pErr
}

// sanityCheckCommittedErr verifies the circumstances in which we're receiving
// an error indicating a COMMITTED transaction. Only rollbacks should be
// encountering such errors. Marking a transaction as explicitly-committed can
// also encounter these errors, but those errors don't make it to the
// TxnCoordSender.
func sanityCheckCommittedErr(ctx context.Context, pErr *roachpb.Error, ba roachpb.BatchRequest) {
errTxn := pErr.GetTxn()
if errTxn == nil || errTxn.Status != roachpb.COMMITTED {
// We shouldn't have been called.
return
}
// The only case in which an error can have a COMMITTED transaction in it is
// when the request was a rollback. Rollbacks can race with commits if a
// context timeout expires while a commit request is in flight.
if ba.IsSingleAbortTxnRequest() {
return
}
// Finding out about our transaction being committed indicates a serious bug.
// Requests are not supposed to be sent on transactions after they are
// committed.
log.Fatalf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn)
}

// setTxnAnchorKey sets the key at which to anchor the transaction record. The
// transaction anchor key defaults to the first key written in a transaction.
func (tc *TxnCoordSender) setTxnAnchorKeyLocked(key roachpb.Key) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) {
key := roachpb.Key("a")
are := roachpb.NewAmbiguousResultError("very ambiguous")
knobs := &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, req := range ba.Requests {
if putReq, ok := req.GetInner().(*roachpb.PutRequest); ok && putReq.Key.Equal(key) {
return roachpb.NewError(are)
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,12 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
}

if h.mu.txn.Status != roachpb.PENDING {
log.Fatalf(ctx,
"txn committed or aborted but heartbeat loop hasn't been signaled to stop. txn: %s",
h.mu.txn)
if h.mu.txn.Status == roachpb.COMMITTED {
log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)
}
// If the transaction is aborted, there's no point in heartbeating. The
// client needs to send a rollback.
return false
}

// Clone the txn in order to put it in the heartbeat request.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) {

var mtc *multiTestContext
storeCfg.TestingKnobs.TestingResponseFilter = func(
ba roachpb.BatchRequest, _ *roachpb.BatchResponse,
ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse,
) *roachpb.Error {
del := ba.Requests[0].GetDelete()
if del != nil && bytes.HasSuffix(del.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 {
Expand Down Expand Up @@ -2777,7 +2777,9 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) {
// This verifies that we're actually testing what we claim to.
var sawMeta2Req int64
meta2CKey := keys.RangeMetaKey(cKey).AsRawKey()
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
storeCfg.TestingKnobs.TestingResponseFilter = func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
for i, req := range ba.Requests {
if g := req.GetGet(); g != nil && g.Key.Equal(meta2CKey) && br.Responses[i].GetGet().Value == nil {
atomic.StoreInt64(&sawMeta2Req, 1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ func TestAdminRelocateRangeSafety(t *testing.T) {
var useSeenAdd atomic.Value
useSeenAdd.Store(false)
seenAdd := make(chan struct{}, 1)
responseFilter := func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
responseFilter := func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if ba.IsSingleRequest() {
changeReplicas, ok := ba.Requests[0].GetInner().(*roachpb.AdminChangeReplicasRequest)
if ok && changeReplicas.Changes()[0].ChangeType == roachpb.ADD_REPLICA && useSeenAdd.Load().(bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2813,7 +2813,7 @@ func TestStoreSplitRangeLookupRace(t *testing.T) {
blockedRangeLookups := int32(0)
rangeLookupIsBlocked := make(chan struct{}, 1)
unblockRangeLookups := make(chan struct{})
respFilter := func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
respFilter := func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
select {
case <-blockRangeLookups:
if kv.TestingIsRangeLookup(ba) &&
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ func TestCheckConsistencyReplay(t *testing.T) {
}

// Arrange to trigger a retry when a ComputeChecksum request arrives.
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
storeCfg.TestingKnobs.TestingResponseFilter = func(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) *roachpb.Error {
state.Lock()
defer state.Unlock()
if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *Replica) sendWithRangeID(
log.Eventf(ctx, "replica.Send got error: %s", pErr)
} else {
if filter := r.store.cfg.TestingKnobs.TestingResponseFilter; filter != nil {
pErr = filter(*ba, br)
pErr = filter(ctx, *ba, br)
}
}
return br, pErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/storagebase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error)
// ReplicaResponseFilter is used in unittests to modify the outbound
// response returned to a waiting client after a replica command has
// been processed. This filter is invoked only by the command proposer.
type ReplicaResponseFilter func(roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error
type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error

// ContainsKey returns whether this range contains the specified key.
func ContainsKey(desc *roachpb.RangeDescriptor, key roachpb.Key) bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func init() {
Expand All @@ -26,6 +27,7 @@ func init() {

func TestMain(m *testing.M) {
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

Expand Down
Loading