Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: prevent STAGING -> PENDING transition during high-priority push #62761

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,21 @@ func PushTxn(

// If we're trying to move the timestamp forward, and it's already
// far enough forward, return success.
if args.PushType == roachpb.PUSH_TIMESTAMP && args.PushTo.LessEq(reply.PusheeTxn.WriteTimestamp) {
pushType := args.PushType
if pushType == roachpb.PUSH_TIMESTAMP && args.PushTo.LessEq(reply.PusheeTxn.WriteTimestamp) {
// Trivial noop.
return result.Result{}, nil
}

// The pusher might be aware of a newer version of the pushee.
increasedEpochOrTimestamp := false
var knownHigherTimestamp, knownHigherEpoch bool
if reply.PusheeTxn.WriteTimestamp.Less(args.PusheeTxn.WriteTimestamp) {
reply.PusheeTxn.WriteTimestamp = args.PusheeTxn.WriteTimestamp
increasedEpochOrTimestamp = true
knownHigherTimestamp = true
}
if reply.PusheeTxn.Epoch < args.PusheeTxn.Epoch {
reply.PusheeTxn.Epoch = args.PusheeTxn.Epoch
increasedEpochOrTimestamp = true
knownHigherEpoch = true
}
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)

Expand All @@ -208,12 +209,32 @@ func PushTxn(
// a higher epoch than the parallel commit attempt, it should not consider
// the pushee to be performing a parallel commit. Its commit status is not
// indeterminate.
if increasedEpochOrTimestamp && reply.PusheeTxn.Status == roachpb.STAGING {
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
reply.PusheeTxn.Status = roachpb.PENDING
reply.PusheeTxn.InFlightWrites = nil
// If the pusher is aware that the pushee's currently recorded attempt
// at a parallel commit failed but the transaction's epoch has not yet
// been incremented, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
// want to move the transaction back to PENDING in the same epoch, as
// this is not (currently) allowed by the recovery protocol. We also
// don't want to move the transaction to a new timestamp while retaining
// the STAGING status, as this could allow the transaction to enter an
// implicit commit state without its knowledge, leading to atomicity
// violations.
//
// This has no effect on pushes that fail with a TransactionPushError.
// Such pushes will still wait on the pushee to retry its commit and
// eventually commit or abort. It also has no effect on expired pushees,
// as they would have been aborted anyway. This only impacts pushes
// which would have succeeded due to priority mismatches. In these
// cases, the push acts the same as a short-circuited transaction
// recovery process, because the transaction recovery procedure always
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
if !knownHigherEpoch && pushType == roachpb.PUSH_TIMESTAMP {
pushType = roachpb.PUSH_ABORT
}
}

pushType := args.PushType
var pusherWins bool
var reason string

Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_recover_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,38 @@ func TestRecoverTxnRecordChanged(t *testing.T) {
}(),
},
{
name: "transaction restart after write prevented",
name: "transaction restart (pending) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.BumpEpoch()
txnCopy.Status = roachpb.PENDING
return txnCopy
}(),
},
{
name: "transaction timestamp increase after write prevented",
name: "transaction restart (staging) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.BumpEpoch()
return txnCopy
}(),
},
{
name: "transaction timestamp increase (pending) after write prevented",
implicitlyCommitted: false,
expError: "cannot recover PENDING transaction in same epoch",
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.Status = roachpb.PENDING
txnCopy.InFlightWrites = nil
txnCopy.WriteTimestamp = txnCopy.WriteTimestamp.Add(1, 0)
return txnCopy
}(),
},
{
name: "transaction timestamp increase (staging) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11367,8 +11367,8 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
expTxn: func(txn *roachpb.Transaction, pushTs hlc.Timestamp) roachpb.TransactionRecord {
record := txn.AsRecord()
record.WriteTimestamp.Forward(pushTs)
record := txnWithStatus(roachpb.ABORTED)(txn, pushTs)
record.WriteTimestamp = record.WriteTimestamp.Add(0, 1)
record.Priority = pusher.Priority - 1
return record
},
Expand Down
130 changes: 130 additions & 0 deletions pkg/kv/kvserver/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -197,6 +198,135 @@ func TestTxnRecoveryFromStaging(t *testing.T) {
}
}

// TestTxnRecoveryFromStagingWithHighPriority tests the transaction recovery
// process initiated by a high-priority operation which encounters a staging
// transaction. The test contains a subtest for each of the combinations of the
// following boolean options:
//
// - pushAbort: configures whether or not the high-priority operation is a
// read (false) or a write (true), which dictates the kind of push
// operation dispatched against the staging transaction.
//
// - newEpoch: configures whether or not the staging transaction wrote the
// intent which the high-priority operation conflicts with at a higher
// epoch than it is staged at. If true, the staging transaction is not
// implicitly committed.
//
// - newTimestamp: configures whether or not the staging transaction wrote the
// intent which the high-priority operation conflicts with at a higher
// timestamp than it is staged at. If true, the staging transaction is not
// implicitly committed.
//
func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

run := func(t *testing.T, pushAbort, newEpoch, newTimestamp bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
manual := hlc.NewManualClock(123)
cfg := TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)

// Create a transaction that will get stuck performing a parallel
// commit.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
txn := newTransaction("txn", keyA, 1, store.Clock())

// Issue two writes, which will be considered in-flight at the time of
// the transaction's EndTxn request.
keyAVal := []byte("value")
pArgs := putArgs(keyA, keyAVal)
pArgs.Sequence = 1
h := roachpb.Header{Txn: txn}
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
require.Nil(t, pErr, "error: %s", pErr)

// The second write may or may not be bumped.
pArgs = putArgs(keyB, []byte("value2"))
pArgs.Sequence = 2
h2 := roachpb.Header{Txn: txn.Clone()}
if newEpoch {
h2.Txn.BumpEpoch()
}
if newTimestamp {
manual.Increment(100)
h2.Txn.WriteTimestamp = store.Clock().Now()
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
require.Nil(t, pErr, "error: %s", pErr)

// Issue a parallel commit, which will put the transaction into a
// STAGING state. Include both writes as the EndTxn's in-flight writes.
et, etH := endTxnArgs(txn, true)
et.InFlightWrites = []roachpb.SequencedWrite{
{Key: keyA, Sequence: 1},
{Key: keyB, Sequence: 2},
}
etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
require.Nil(t, pErr, "error: %s", pErr)
require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)

// Issue a conflicting, high-priority operation.
var conflictArgs roachpb.Request
if pushAbort {
pArgs = putArgs(keyB, []byte("value3"))
conflictArgs = &pArgs
} else {
gArgs := getArgs(keyB)
conflictArgs = &gArgs
}
manual.Increment(100)
conflictH := roachpb.Header{
UserPriority: roachpb.MaxUserPriority,
Timestamp: store.Clock().Now(),
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), conflictH, conflictArgs)
require.Nil(t, pErr, "error: %s", pErr)

// Query the transaction and verify that it has the right state.
qtArgs := queryTxnArgs(txn.TxnMeta, false /* waitForUpdate */)
qtReply, pErr := kv.SendWrapped(ctx, store.TestSender(), &qtArgs)
require.Nil(t, pErr, "error: %s", pErr)
qtTxn := qtReply.(*roachpb.QueryTxnResponse).QueriedTxn

if !newEpoch && !newTimestamp {
// The transaction was implicitly committed at its initial epoch and
// timestamp.
require.Equal(t, roachpb.COMMITTED, qtTxn.Status)
require.Equal(t, txn.Epoch, qtTxn.Epoch)
require.Equal(t, txn.WriteTimestamp, qtTxn.WriteTimestamp)
} else if newEpoch {
// The transaction is aborted if that's what the high-priority
// request wants. Otherwise, the transaction's record is bumped to
// the new epoch pulled from its intent and pushed above the
// high-priority request's timestamp.
if pushAbort {
require.Equal(t, roachpb.ABORTED, qtTxn.Status)
} else /* pushTimestamp */ {
require.Equal(t, roachpb.PENDING, qtTxn.Status)
require.Equal(t, txn.Epoch+1, qtTxn.Epoch)
require.Equal(t, conflictH.Timestamp.Next(), qtTxn.WriteTimestamp)
}
} else /* if newTimestamp */ {
// The transaction is aborted, even if the high-priority request
// only needed it to be pushed to a higher timestamp. This is
// because we don't allow a STAGING transaction record to move back
// to PENDING in the same epoch.
require.Equal(t, roachpb.ABORTED, qtTxn.Status)
}
}

testutils.RunTrueAndFalse(t, "push_abort", func(t *testing.T, pushAbort bool) {
testutils.RunTrueAndFalse(t, "new_epoch", func(t *testing.T, newEpoch bool) {
testutils.RunTrueAndFalse(t, "new_timestamp", func(t *testing.T, newTimestamp bool) {
run(t, pushAbort, newEpoch, newTimestamp)
})
})
})
}

// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
// write intents. This can cause it to remove an intent from an implicitly
// committed STAGING txn. When txn recovery kicks in, it will fail to find the
Expand Down