Skip to content

Commit

Permalink
kv: ignore missing txn record errors in txn heartbeat loop
Browse files Browse the repository at this point in the history
Related to cockroachdb#20448.

This change adds a reason enum to`TransactionStatusError`. This
enum includes a variant is returned when an operation looks up a
transaction record that it expects to exist and finds that it does
not. This is possible for `HeartbeatTxnRequests` and `EndTxnRequests`.

The change then detects this error type in the `TxnCoordSender`
heartbeat loop. Unlike for any other error type, this error does
not force the heartbeat loop to shut down. The reason for this is
that the heartbeat loop can be started before a client is certain
that a transaction record has been written. If this is ever the
case, like we saw in the YCSB exploration, then terminating the
heartbeat loop on a `TransactionStatusError` will prematurely
abort the entire transaction.

Release note (bug fix): Fixed bug where an expected transaction
heartbeat failure aborted the transaction.
  • Loading branch information
nvanbenschoten committed Apr 2, 2018
1 parent c18ad15 commit 8fe41db
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 168 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,16 @@ func (tc *TxnCoordSender) heartbeat(ctx context.Context) bool {
// transaction record at all, we have to assume it's been aborted as well.
if pErr != nil {
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)

// If the heartbeat request arrived to find a missing transaction record
// then we ignore the error and continue the heartbeat loop. This is
// possible if the heartbeat loop was started before a BeginTxn request
// succeeds because of ambiguity in the first write request's response.
if tse, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok &&
tse.Reason == roachpb.TransactionStatusError_REASON_TXN_NOT_FOUND {
return true
}

if errTxn := pErr.GetTxn(); errTxn != nil {
tc.mu.Lock()
tc.mu.meta.Txn.Update(errTxn)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) {
{
var ba roachpb.BatchRequest
ba.Add(&roachpb.EndTransactionRequest{
Commit: false,
Span: roachpb.Span{Key: initialTxn.Proto().Key},
Commit: false,
Poison: true,
})
ba.Txn = initialTxn.Proto()
if _, pErr := tc.TxnCoordSenderFactory.wrapped.Send(context.Background(), ba); pErr != nil {
Expand Down Expand Up @@ -713,8 +714,7 @@ func TestTxnCoordSenderCancel(t *testing.T) {
// path. We'll either succeed, get a "does not exist" error, or get a
// context canceled error. Anything else is unexpected.
err := txn.CommitOrCleanup(ctx)
if err != nil && err.Error() != context.Canceled.Error() &&
!testutils.IsError(err, "TransactionStatusError: does not exist") {
if err != nil && err.Error() != context.Canceled.Error() {
t.Fatal(err)
}
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (e *TransactionRetryError) Error() string {
}

func (e *TransactionRetryError) message(pErr *Error) string {
return fmt.Sprintf("TransactionRetryError: retry txn (%s): %s", e.Reason, pErr.GetTxn())
return fmt.Sprintf("%s: %s", e.Error(), pErr.GetTxn())
}

var _ ErrorDetailInterface = &TransactionRetryError{}
Expand Down Expand Up @@ -424,15 +424,27 @@ var _ ErrorDetailInterface = &TransactionReplayError{}
// NewTransactionStatusError initializes a new TransactionStatusError from
// the given message.
func NewTransactionStatusError(msg string) *TransactionStatusError {
return &TransactionStatusError{Msg: msg}
return &TransactionStatusError{
Msg: msg,
Reason: TransactionStatusError_REASON_UNKNOWN,
}
}

// NewTransactionNotFoundStatusError initializes a new TransactionStatusError with
// a REASON_TXN_NOT_FOUND reason.
func NewTransactionNotFoundStatusError() *TransactionStatusError {
return &TransactionStatusError{
Msg: "txn record not found",
Reason: TransactionStatusError_REASON_TXN_NOT_FOUND,
}
}

func (e *TransactionStatusError) Error() string {
return "TransactionStatusError: " + e.Msg
return fmt.Sprintf("TransactionStatusError: %s (%s)", e.Msg, e.Reason)
}

func (e *TransactionStatusError) message(pErr *Error) string {
return fmt.Sprintf("txn %s: %s", pErr.GetTxn(), e.Msg)
return fmt.Sprintf("%s: %s", e.Error(), pErr.GetTxn())
}

var _ ErrorDetailInterface = &TransactionStatusError{}
Expand Down
351 changes: 210 additions & 141 deletions pkg/roachpb/errors.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ message TransactionStatusError {
option (gogoproto.equal) = true;

optional string msg = 1 [(gogoproto.nullable) = false];

// Reason specifies what caused the error.
enum Reason {
// For backwards compatibility.
REASON_UNKNOWN = 0;
// The request was sent to a transaction record that does not exist.
REASON_TXN_NOT_FOUND = 1;
}
optional Reason reason = 2 [(gogoproto.nullable) = false];
}

// A WriteIntentError indicates that one or more write intent
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -66,7 +65,7 @@ func HeartbeatTxn(
// This could mean the heartbeat is a delayed relic or it could
// mean that the BeginTransaction call was delayed. In either
// case, there's no reason to persist a new transaction record.
return result.Result{}, errors.Errorf("heartbeat for transaction %s failed; record not present", h.Txn)
return result.Result{}, roachpb.NewTransactionNotFoundStatusError()
}

if txn.Status == roachpb.PENDING {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func evalEndTransaction(
); err != nil {
return result.Result{}, err
} else if !ok {
return result.Result{}, roachpb.NewTransactionStatusError("does not exist")
return result.Result{}, roachpb.NewTransactionNotFoundStatusError()
}
// We're using existingTxn on the reply, although it can be stale
// compared to the Transaction in the request (e.g. the Sequence,
Expand Down
37 changes: 20 additions & 17 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,8 +2593,8 @@ func TestReplicaCommandQueueCancellationLocal(t *testing.T) {

t.Run("16266", func(t *testing.T) {
instrs := []cancelInstr{
{reqOverride: heartbeatBa, expErr: "record not present"},
{reqOverride: endTxnBa, expErr: "does not exist"},
{reqOverride: heartbeatBa, expErr: "txn record not found"},
{reqOverride: endTxnBa, expErr: "txn record not found"},
{reqOverride: pushBa},
{reqOverride: pushBa},
}
Expand All @@ -2604,10 +2604,10 @@ func TestReplicaCommandQueueCancellationLocal(t *testing.T) {
})
t.Run("CancelEndTxn", func(t *testing.T) {
instrs := []cancelInstr{
{reqOverride: heartbeatBa, expErr: "record not present"},
{reqOverride: endTxnBa, expErr: "does not exist"},
{reqOverride: heartbeatBa, expErr: "txn record not found"},
{reqOverride: endTxnBa, expErr: "txn record not found"},
{reqOverride: pushBa},
{reqOverride: heartbeatBa, expErr: "record not present"},
{reqOverride: heartbeatBa, expErr: "txn record not found"},
{reqOverride: resolveIntentBa},
{reqOverride: pushBa},
}
Expand Down Expand Up @@ -2657,7 +2657,7 @@ func TestReplicaCommandQueueCancellationLocal(t *testing.T) {
{reqOverride: splitBa},
{reqOverride: putKeyBa, expErr: "retry txn"},
{reqOverride: getKeyBa},
{reqOverride: endTxnBa, expErr: "does not exist"},
{reqOverride: endTxnBa, expErr: "txn record not found"},
{reqOverride: resolveIntentBa},
}

Expand Down Expand Up @@ -3593,8 +3593,10 @@ func TestReplicaAbortSpanOnlyWithIntent(t *testing.T) {
args, h := heartbeatArgs(txn, tc.Clock().Now())
// If the AbortSpan were active for this request, we'd catch a txn retry.
// Instead, we expect the error from heartbeating a nonexistent txn.
if _, pErr := tc.SendWrappedWith(h, &args); !testutils.IsPError(pErr, "record not present") {
t.Fatal(pErr)
_, pErr := tc.SendWrappedWith(h, &args)
if tse, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); !ok ||
tse.Reason != roachpb.TransactionStatusError_REASON_TXN_NOT_FOUND {
t.Fatalf("expected TransactionStatusError with REASON_TXN_NOT_FOUND, found %v", pErr)
}
}

Expand Down Expand Up @@ -4213,11 +4215,11 @@ func TestEndTransactionWithErrors(t *testing.T) {
existTS hlc.Timestamp
expErrRegexp string
}{
{roachpb.Key("a"), doesNotExist, txn.Epoch, txn.Timestamp, "does not exist"},
{roachpb.Key("a"), roachpb.COMMITTED, txn.Epoch, txn.Timestamp, "txn \"test\" id=.*: already committed"},
{roachpb.Key("b"), roachpb.ABORTED, txn.Epoch, txn.Timestamp, "txn aborted \"test\" id=.*"},
{roachpb.Key("c"), roachpb.PENDING, txn.Epoch + 1, txn.Timestamp, "txn \"test\" id=.*: epoch regression: 0"},
{roachpb.Key("d"), roachpb.PENDING, txn.Epoch, regressTS, `txn "test" id=.*: timestamp regression: 0.\d+,\d+`},
{roachpb.Key("a"), doesNotExist, txn.Epoch, txn.Timestamp, "txn record not found"},
{roachpb.Key("a"), roachpb.COMMITTED, txn.Epoch, txn.Timestamp, "already committed"},
{roachpb.Key("b"), roachpb.ABORTED, txn.Epoch, txn.Timestamp, "txn aborted"},
{roachpb.Key("c"), roachpb.PENDING, txn.Epoch + 1, txn.Timestamp, "epoch regression: 0"},
{roachpb.Key("d"), roachpb.PENDING, txn.Epoch, regressTS, `timestamp regression: 0`},
}
for i, test := range testCases {
// Establish existing txn state by writing directly to range engine.
Expand Down Expand Up @@ -4523,10 +4525,11 @@ func TestRaftRetryCantCommitIntents(t *testing.T) {
t.Error(pErr)
}

// EndTransaction should fail with a status error (does not exist).
// EndTransaction should fail with a txn not found error.
_, pErr = tc.SendWrappedWith(etH, &et)
if _, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); !ok {
t.Errorf("expected transaction aborted for iso=%s; got %s", iso, pErr)
if tse, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); !ok ||
tse.Reason != roachpb.TransactionStatusError_REASON_TXN_NOT_FOUND {
t.Fatalf("expected TransactionStatusError with REASON_TXN_NOT_FOUND, found %v", pErr)
}

// Expect that keyB intent did not get written!
Expand Down Expand Up @@ -8746,7 +8749,7 @@ func TestNoopRequestsNotProposed(t *testing.T) {
name: "failed commit txn req",
useTxn: true,
req: commitTxnReq,
expFailure: "txn .* does not exist",
expFailure: "txn record not found",
// No-op - the request fails.
expProposal: false,
},
Expand Down

0 comments on commit 8fe41db

Please sign in to comment.