Skip to content

Commit

Permalink
kvclient: improve already-committed assertion
Browse files Browse the repository at this point in the history
This patch improves the assertion that requests don't get errors
informing them that their transaction has been committed from underneath
them. There is a semi-legitimate case that's hitting that assertion: a
rollback sent after a commit returned an ambiguous result to the client.
This happens easily when the commit's ctx times out - the client gets an
ambiguous error and is likely to send a rollback attempt afterwards.
This rollback attempt might find a transaction record with the COMMITTED
status, in which case the assertion was hit.
This patch makes the assertion white-list this case. We were already
white-listing this specific case in other places. For other, unexpected
cases, the assertion (re-)becomes a Fatalf (from an Errorf), since an
unepectedly-committed transaction is no joke.

This patch also adds a test for the case that was triggering the
assertion. Also, it adds tests for another two most troubling cases of
rollback-after-ambiguous-commit that we handle incorrectly. Those
behaviors are not fixed; the test just reproduces and highlights them.

Touches #48301
Touches #48302

Release note: None
  • Loading branch information
andreimatei committed May 1, 2020
1 parent 2997be0 commit 8a10574
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 4 deletions.
23 changes: 19 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,16 +796,31 @@ func (tc *TxnCoordSender) updateStateLocked(
// Update our transaction with any information the error has.
if errTxn := pErr.GetTxn(); errTxn != nil {
if errTxn.Status == roachpb.COMMITTED {
// Finding out about our transaction being committed indicates a serious
// bug. Requests are not supposed to be sent on transactions after they
// are committed.
log.Errorf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn)
sanityCheckCommittedErr(ctx, pErr, ba)
}
tc.mu.txn.Update(errTxn)
}
return pErr
}

func sanityCheckCommittedErr(ctx context.Context, pErr *roachpb.Error, ba roachpb.BatchRequest) {
errTxn := pErr.GetTxn()
if errTxn == nil || errTxn.Status != roachpb.COMMITTED {
// We shouldn't have been called.
return
}
// The only case in which an error can have a COMMITTED transaction in it is
// when the request was a rollback. Rollbacks can race with commits if a
// context timeout expires while a commit request is in flight.
if ba.IsSingleAbortTxnRequest() {
return
}
// Finding out about our transaction being committed indicates a serious bug.
// Requests are not supposed to be sent on transactions after they are
// committed.
log.Fatalf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn)
}

// setTxnAnchorKey sets the key at which to anchor the transaction record. The
// transaction anchor key defaults to the first key written in a transaction.
func (tc *TxnCoordSender) setTxnAnchorKeyLocked(key roachpb.Key) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func init() {
Expand All @@ -26,6 +27,7 @@ func init() {

func TestMain(m *testing.M) {
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

Expand Down
219 changes: 219 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kv_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// Test the behavior of a txn.Rollback() issued after txn.Commit() failing with
// an ambiguous error.
func TestRollbackAfterAmbiguousCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

testCases := []struct {
name string
// The status of the transaction record at the time when we issue the
// rollback.
txnStatus roachpb.TransactionStatus
// If txnStatus == COMMITTED, setting txnRecordCleanedUp will make us
// cleanup the transaction. The txn record will be deleted.
txnRecordCleanedUp bool
// The error that we expect from txn.Rollback().
expRollbackErr string
// Is the transaction expected to be committed or not after the
// txn.Rollback() call returns?
expCommitted bool
}{
{
name: "txn cleaned up",
txnStatus: roachpb.COMMITTED,
txnRecordCleanedUp: true,
// The transaction will be committed, but at the same time the rollback
// will also appear to succeed (it'll be a no-op). This behavior is
// undesired. See #48302.
expCommitted: true,
expRollbackErr: "",
},
{
name: "COMMITTED",
txnStatus: roachpb.COMMITTED,
txnRecordCleanedUp: false,
expCommitted: true,
expRollbackErr: "already committed",
},
{
name: "STAGING",
txnStatus: roachpb.STAGING,
// The rollback succeeds. This behavior is undersired. See #48301.
expCommitted: false,
expRollbackErr: "",
},
}
for _, testCase := range testCases {
if testCase.txnRecordCleanedUp {
require.Equal(t, roachpb.COMMITTED, testCase.txnStatus)
}
t.Run(testCase.name, func(t *testing.T) {
var filterSet int64
var key roachpb.Key
commitBlocked := make(chan struct{})
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// We're going to block the commit of the test's txn, letting the
// test cancel the request's ctx while the request is blocked.
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if atomic.LoadInt64(&filterSet) == 0 {
return nil
}
req, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return nil
}
commit := req.(*roachpb.EndTxnRequest)
if commit.Key.Equal(key) && commit.Commit {
// Inform the test that the commit is blocked.
commitBlocked <- struct{}{}
// Block until the client interrupts the commit. The client will
// cancel its context, at which point gRPC will return an error
// to the client and marshall the cancelation to the server.
<-ctx.Done()
}
return nil
},
},
},
}
tci := serverutils.StartTestCluster(t, 2, base.TestClusterArgs{ServerArgs: args})
tc := tci.(*testcluster.TestCluster)
defer tc.Stopper().Stop(ctx)

key = tc.ScratchRange(t)
atomic.StoreInt64(&filterSet, 1)

// This test uses a cluster with two nodes. It'll create a transaction
// having as a coordinator the node that's *not* the leaseholder for the
// range the txn is writing to. This is in order for the context
// cancelation scheme that we're going to employ to work: we need a
// non-local RPC such that canceling a requests context triggers an error
// for the request's sender without synchronizing with the request's
// evaluation.
rdesc := tc.LookupRangeOrFatal(t, key)
leaseHolder, err := tc.FindRangeLeaseHolder(rdesc, nil /* hint */)
require.NoError(t, err)
var db *kv.DB
if leaseHolder.NodeID == 1 {
db = tc.Servers[1].DB()
} else {
db = tc.Servers[0].DB()
}

txn := db.NewTxn(ctx, "test")
require.NoError(t, txn.Put(ctx, key, "val"))

// If the test we want the transaction to be committed and cleaned up,
// we'll do a read on the key we just wrote. That will act as a pipeline
// stall, resolving the in-flight write and eliminating the need for the
// STAGING status.
if testCase.txnStatus == roachpb.COMMITTED && testCase.txnRecordCleanedUp {
v, err := txn.Get(ctx, key)
require.NoError(t, err)
val, err := v.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "val", string(val))
}

// Send a commit request. It's going to get blocked after being evaluated,
// at which point we're going to cancel the request's ctx. The ctx
// cancelation will cause gRPC to interrupt the in-flight request, and the
// DistSender to return an ambiguous error. The transaction will be
// committed, through.
commitCtx, cancelCommit := context.WithCancel(ctx)
commitCh := make(chan error)
go func() {
commitCh <- txn.Commit(commitCtx)
}()

select {
case <-commitBlocked:
case <-time.After(10 * time.Second):
t.Fatalf("commit not blocked")
}

cancelCommit()
commitErr := <-commitCh
require.IsType(t, &roachpb.AmbiguousResultError{}, commitErr)
require.Regexp(t, `result is ambiguous \(context done during DistSender.Send: context canceled\)`,
commitErr)

// If the test wants the upcoming rollback to find a COMMITTED record,
// we'll perform transaction recovery. This will leave the transaction in
// the COMMITTED state, without cleaning it up.
if !testCase.txnRecordCleanedUp && testCase.txnStatus == roachpb.COMMITTED {
// Sanity check - verify that the txn is STAGING.
txnProto := txn.TestingCloneTxn()
queryTxn := roachpb.QueryTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txnProto.Key,
},
Txn: txnProto.TxnMeta,
}
b := kv.Batch{}
b.AddRawRequest(&queryTxn)
require.NoError(t, db.Run(ctx, &b))
queryTxnRes := b.RawResponse().Responses[0].GetQueryTxn()
require.Equal(t, roachpb.STAGING, queryTxnRes.QueriedTxn.Status)

// Perform transaction recovery.
require.NoError(t, kvclientutils.CheckPushResult(ctx, db, *txn.TestingCloneTxn(),
kvclientutils.ExpectCommitted, kvclientutils.ExpectPusheeTxnRecovery))
}

// Attempt the rollback and check its result.
rollbackErr := txn.Rollback(ctx)
if testCase.expRollbackErr == "" {
require.NoError(t, rollbackErr)
} else {
require.Regexp(t, testCase.expRollbackErr, rollbackErr)
}

// Check the outcome of the transaction, independently from the outcome of
// the Rollback() above, by reading the value that it wrote.
committed := func() bool {
val, err := db.Get(ctx, key)
require.NoError(t, err)
if !val.Exists() {
return false
}
v, err := val.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "val", string(v))
return true
}()
require.Equal(t, testCase.expCommitted, committed)
})
}
}

0 comments on commit 8a10574

Please sign in to comment.