Skip to content

Commit

Permalink
kvserver: use timestamp cache to mark and check txn records
Browse files Browse the repository at this point in the history
Potential 1PC transactions must currently check for an existing
transaction record on disk. This patch overloads the timestamp cache
tombstone markers to also mark heartbeated transaction records, which
allows 1PC transaction to check this in the timestamp cache instead via
`CanCreateTxnRecord()`. However, the overloading is not able to
distinguish between a failure due to an existing record and one due to a
finalized transaction, so it falls back to 2PC `EndTxn` evaluation to
make this determination by checking the record on disk and returning an
appropriate error.

Release note: None
  • Loading branch information
erikgrinaker committed Jul 21, 2021
1 parent 4225c2d commit 489edcf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 36 deletions.
16 changes: 0 additions & 16 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -195,17 +193,3 @@ func SynthesizeTxnFromMeta(
}
return synth.AsTransaction()
}

// HasTxnRecord returns whether the provided transaction has a transaction
// record. The provided reader must come from the leaseholder of the transaction
// record's Range.
func HasTxnRecord(
ctx context.Context, reader storage.Reader, txn *roachpb.Transaction,
) (bool, error) {
key := keys.TransactionKey(txn.Key, txn.ID)
val, _, err := storage.MVCCGet(ctx, reader, key, hlc.Timestamp{}, storage.MVCCGetOptions{})
if err != nil {
return false, err
}
return val != nil, nil
}
57 changes: 48 additions & 9 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ func (r *Replica) updateTimestampCache(
key := transactionTombstoneMarker(start, txnID)
addToTSCache(key, nil, ts, txnID)
}
case *roachpb.HeartbeatTxnRequest:
// HeartbeatTxn requests overload tombstone markers to also act as markers
// for written transaction records. This lets potential 1PC transactions
// avoid hitting disk to check for records, but means that
// CanCreateTxnRecord cannot distinguish between a failure due to an
// existing record or one due to a finalized transaction, and so the
// returned reason is only valid if the caller already checked that no
// transaction record currently exists.
key := transactionTombstoneMarker(start, txnID)
addToTSCache(key, nil, ts, txnID)
case *roachpb.RecoverTxnRequest:
// A successful RecoverTxn request may or may not have finalized the
// transaction that it was trying to recover. If so, then we record
Expand Down Expand Up @@ -359,6 +369,14 @@ func (r *Replica) applyTimestampCache(
// ever determines that a transaction record must be rejected, it will continue
// to reject that transaction going forwards.
//
// The returned reason is only reliable if the caller has verified that a
// transaction record does not currently exist. This is due to the overloading of
// transactionTombstoneMarker to also mark that a transaction heartbeat has
// created a transaction record, such that calling this with an existing
// transaction record may return a bogus
// ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY when the
// reason is instead that a record already exists.
//
// The method performs two critical roles:
//
// 1. It protects against replayed requests or new requests from a
Expand All @@ -375,6 +393,12 @@ func (r *Replica) applyTimestampCache(
// never needs to explicitly create the transaction record for contending
// transactions.
//
// In addition, it allows potential 1PC transaction to avoid checking for
// transaction records on disk, by overloading tombstone markers to also mark
// that HeartbeatTxn has written a transaction record. However, the cache is
// unable to distinguish between failures due to existing and finalized record,
// which callers must take into account.
//
// This is detailed in the transaction record state machine below:
//
// +----------------------------------------------------+
Expand All @@ -388,12 +412,13 @@ func (r *Replica) applyTimestampCache(
// | v -> t = forward v by timestamp t |
// +----------------------------------------------------+
//
// PushTxn(TIMESTAMP) HeartbeatTxn
// then: v1 -> push.ts then: update record
// +------+ +------+
// PushTxn(ABORT) | | HeartbeatTxn | | PushTxn(TIMESTAMP)
// then: v2 -> txn.ts | v if: v2 < txn.orig | v then: update record
// +-----------------+ then: txn.ts -> v1 +--------------------+
// HeartbeatTxn
// PushTxn(TIMESTAMP) then: update record
// then: v1 -> push.ts v2 -> txn.ts
// +------+ HeartbeatTxn +------+
// PushTxn(ABORT) | | if: v2 < txn.orig | | PushTxn(TIMESTAMP)
// then: v2 -> txn.ts | v then: txn.ts -> v1 | v then: update record
// +-----------------+ v2 -> txn.ts +--------------------+
// +----| | else: fail | |----+
// | | |------------------------->| | |
// | | no txn record | | txn record written | |
Expand Down Expand Up @@ -506,17 +531,26 @@ func (r *Replica) CanCreateTxnRecord(
// then the error will be transformed into an ambiguous one higher up.
// Otherwise, if the client is still waiting for a result, then this cannot
// be a "replay" of any sort.
tombstoneTimestamp, tombstomeTxnID := r.store.tsCache.GetMax(tombstoneKey, nil /* end */)
//
// Note that this only makes sense when the caller has already verified that
// no transaction record exists. This marker is also overloaded to mean that a
// heartbeat has written a transaction record.
tombstoneTimestamp, tombstoneTxnID := r.store.tsCache.GetMax(tombstoneKey, nil /* end */)
// Compare against the minimum timestamp that the transaction could have
// written intents at.
if txnMinTS.LessEq(tombstoneTimestamp) {
switch tombstomeTxnID {
switch tombstoneTxnID {
case txnID:
// If we find our own transaction ID then an EndTxn request sent by
// our coordinator has already been processed. We might be a replay (e.g.
// a DistSender retry), or we raced with an asynchronous abort. Either
// way, return an error.
//
// If the caller has not already verified that no transaction record
// exists, then this will also be returned for an existing transaction
// record (since the marker is overloaded to also mean that a heartbeat
// has written a record), even though the reason indicates otherwise.
//
// TODO(andrei): We could keep a bit more info in the tscache to return a
// different error for COMMITTED transactions. If the EndTxn(commit) was
// the only request in the batch, this this would be sufficient for the
Expand Down Expand Up @@ -552,11 +586,16 @@ func (r *Replica) CanCreateTxnRecord(
// guard against creating a transaction record after the transaction record has
// been cleaned up (i.e. by a BeginTxn being evaluated out of order or arriving
// after another txn Push(Abort)'ed the txn).
//
// This is also overloaded to mark that a HeartbeatTxn request has written a
// transaction record, to avoid checking for that record on disk. Thus, the
// "tombstone" sense is only valid when the CanCreateTxnRecord caller has
// already verified that no transaction record exists on disk.
func transactionTombstoneMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key {
return append(keys.TransactionKey(key, txnID), []byte("-tmbs")...)
}

// transactionPushMarker returns the key used by the marker indicating that a
// transactionPushMarker returns the key used as a marker indicating that a
// particular txn was pushed before writing its transaction record. It is used
// as a marker in the timestamp cache indicating that the transaction was pushed
// in case the push happens before there's a transaction record.
Expand Down
23 changes: 12 additions & 11 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,21 @@ func (r *Replica) canAttempt1PCEvaluation(
ba.Timestamp, ba.Txn.WriteTimestamp)
}

// Check whether the txn record has already been created. If so, we can't
// perform a 1PC evaluation because we need to clean up the record during
// evaluation.
if ok, err := batcheval.HasTxnRecord(ctx, r.store.Engine(), ba.Txn); ok || err != nil {
return false, roachpb.NewError(err)
}

// The EndTxn checks whether the txn record can be created, but we're
// eliding the EndTxn. So, we'll do the check instead.
ok, minCommitTS, reason := r.CanCreateTxnRecord(ctx, ba.Txn.ID, ba.Txn.Key, ba.Txn.MinTimestamp)
//
// Note that we cannot rely on the returned reason here, since HeartbeatTxn
// requests overload the txn record tombstone markers in the timestamp cache
// to also mean that a heartbeat has written a record. CanCreateTxnRecord
// therefore cannot distinguish between a finalized and non-finalized
// transaction, and so the returned reason assumes that the caller has already
// checked that no transaction record exists on disk -- we don't check that
// here as a performance optimization, and therefore cannot rely on the
// reason, so we fall back to 2PC evaluation and let EndTxn do error handling
// after checking for the txn record on disk.
ok, minCommitTS, _ := r.CanCreateTxnRecord(ctx, ba.Txn.ID, ba.Txn.Key, ba.Txn.MinTimestamp)
if !ok {
newTxn := ba.Txn.Clone()
newTxn.Status = roachpb.ABORTED
return false, roachpb.NewErrorWithTxn(roachpb.NewTransactionAbortedError(reason), newTxn)
return false, nil
}
if ba.Timestamp.Less(minCommitTS) {
ba.Txn.WriteTimestamp = minCommitTS
Expand Down

0 comments on commit 489edcf

Please sign in to comment.