Skip to content

Commit

Permalink
Merge pull request cockroachdb#62810 from nvanbenschoten/backport21.1…
Browse files Browse the repository at this point in the history
…-62761

release-21.1: kv: prevent STAGING -> PENDING transition during high-priority push
  • Loading branch information
nvanbenschoten authored Mar 30, 2021
2 parents b53886a + b0944da commit f902821
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 10 deletions.
33 changes: 27 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,21 @@ func PushTxn(

// If we're trying to move the timestamp forward, and it's already
// far enough forward, return success.
if args.PushType == roachpb.PUSH_TIMESTAMP && args.PushTo.LessEq(reply.PusheeTxn.WriteTimestamp) {
pushType := args.PushType
if pushType == roachpb.PUSH_TIMESTAMP && args.PushTo.LessEq(reply.PusheeTxn.WriteTimestamp) {
// Trivial noop.
return result.Result{}, nil
}

// The pusher might be aware of a newer version of the pushee.
increasedEpochOrTimestamp := false
var knownHigherTimestamp, knownHigherEpoch bool
if reply.PusheeTxn.WriteTimestamp.Less(args.PusheeTxn.WriteTimestamp) {
reply.PusheeTxn.WriteTimestamp = args.PusheeTxn.WriteTimestamp
increasedEpochOrTimestamp = true
knownHigherTimestamp = true
}
if reply.PusheeTxn.Epoch < args.PusheeTxn.Epoch {
reply.PusheeTxn.Epoch = args.PusheeTxn.Epoch
increasedEpochOrTimestamp = true
knownHigherEpoch = true
}
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)

Expand All @@ -208,12 +209,32 @@ func PushTxn(
// a higher epoch than the parallel commit attempt, it should not consider
// the pushee to be performing a parallel commit. Its commit status is not
// indeterminate.
if increasedEpochOrTimestamp && reply.PusheeTxn.Status == roachpb.STAGING {
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
reply.PusheeTxn.Status = roachpb.PENDING
reply.PusheeTxn.InFlightWrites = nil
// If the pusher is aware that the pushee's currently recorded attempt
// at a parallel commit failed but the transaction's epoch has not yet
// been incremented, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
// want to move the transaction back to PENDING in the same epoch, as
// this is not (currently) allowed by the recovery protocol. We also
// don't want to move the transaction to a new timestamp while retaining
// the STAGING status, as this could allow the transaction to enter an
// implicit commit state without its knowledge, leading to atomicity
// violations.
//
// This has no effect on pushes that fail with a TransactionPushError.
// Such pushes will still wait on the pushee to retry its commit and
// eventually commit or abort. It also has no effect on expired pushees,
// as they would have been aborted anyway. This only impacts pushes
// which would have succeeded due to priority mismatches. In these
// cases, the push acts the same as a short-circuited transaction
// recovery process, because the transaction recovery procedure always
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
if !knownHigherEpoch && pushType == roachpb.PUSH_TIMESTAMP {
pushType = roachpb.PUSH_ABORT
}
}

pushType := args.PushType
var pusherWins bool
var reason string

Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_recover_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,38 @@ func TestRecoverTxnRecordChanged(t *testing.T) {
}(),
},
{
name: "transaction restart after write prevented",
name: "transaction restart (pending) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.BumpEpoch()
txnCopy.Status = roachpb.PENDING
return txnCopy
}(),
},
{
name: "transaction timestamp increase after write prevented",
name: "transaction restart (staging) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.BumpEpoch()
return txnCopy
}(),
},
{
name: "transaction timestamp increase (pending) after write prevented",
implicitlyCommitted: false,
expError: "cannot recover PENDING transaction in same epoch",
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.Status = roachpb.PENDING
txnCopy.InFlightWrites = nil
txnCopy.WriteTimestamp = txnCopy.WriteTimestamp.Add(1, 0)
return txnCopy
}(),
},
{
name: "transaction timestamp increase (staging) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11367,8 +11367,8 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
expTxn: func(txn *roachpb.Transaction, pushTs hlc.Timestamp) roachpb.TransactionRecord {
record := txn.AsRecord()
record.WriteTimestamp.Forward(pushTs)
record := txnWithStatus(roachpb.ABORTED)(txn, pushTs)
record.WriteTimestamp = record.WriteTimestamp.Add(0, 1)
record.Priority = pusher.Priority - 1
return record
},
Expand Down
130 changes: 130 additions & 0 deletions pkg/kv/kvserver/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -197,6 +198,135 @@ func TestTxnRecoveryFromStaging(t *testing.T) {
}
}

// TestTxnRecoveryFromStagingWithHighPriority tests the transaction recovery
// process initiated by a high-priority operation which encounters a staging
// transaction. The test contains a subtest for each of the combinations of the
// following boolean options:
//
// - pushAbort: configures whether or not the high-priority operation is a
// read (false) or a write (true), which dictates the kind of push
// operation dispatched against the staging transaction.
//
// - newEpoch: configures whether or not the staging transaction wrote the
// intent which the high-priority operation conflicts with at a higher
// epoch than it is staged at. If true, the staging transaction is not
// implicitly committed.
//
// - newTimestamp: configures whether or not the staging transaction wrote the
// intent which the high-priority operation conflicts with at a higher
// timestamp than it is staged at. If true, the staging transaction is not
// implicitly committed.
//
func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

run := func(t *testing.T, pushAbort, newEpoch, newTimestamp bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
manual := hlc.NewManualClock(123)
cfg := TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)

// Create a transaction that will get stuck performing a parallel
// commit.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
txn := newTransaction("txn", keyA, 1, store.Clock())

// Issue two writes, which will be considered in-flight at the time of
// the transaction's EndTxn request.
keyAVal := []byte("value")
pArgs := putArgs(keyA, keyAVal)
pArgs.Sequence = 1
h := roachpb.Header{Txn: txn}
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
require.Nil(t, pErr, "error: %s", pErr)

// The second write may or may not be bumped.
pArgs = putArgs(keyB, []byte("value2"))
pArgs.Sequence = 2
h2 := roachpb.Header{Txn: txn.Clone()}
if newEpoch {
h2.Txn.BumpEpoch()
}
if newTimestamp {
manual.Increment(100)
h2.Txn.WriteTimestamp = store.Clock().Now()
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
require.Nil(t, pErr, "error: %s", pErr)

// Issue a parallel commit, which will put the transaction into a
// STAGING state. Include both writes as the EndTxn's in-flight writes.
et, etH := endTxnArgs(txn, true)
et.InFlightWrites = []roachpb.SequencedWrite{
{Key: keyA, Sequence: 1},
{Key: keyB, Sequence: 2},
}
etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
require.Nil(t, pErr, "error: %s", pErr)
require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)

// Issue a conflicting, high-priority operation.
var conflictArgs roachpb.Request
if pushAbort {
pArgs = putArgs(keyB, []byte("value3"))
conflictArgs = &pArgs
} else {
gArgs := getArgs(keyB)
conflictArgs = &gArgs
}
manual.Increment(100)
conflictH := roachpb.Header{
UserPriority: roachpb.MaxUserPriority,
Timestamp: store.Clock().Now(),
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), conflictH, conflictArgs)
require.Nil(t, pErr, "error: %s", pErr)

// Query the transaction and verify that it has the right state.
qtArgs := queryTxnArgs(txn.TxnMeta, false /* waitForUpdate */)
qtReply, pErr := kv.SendWrapped(ctx, store.TestSender(), &qtArgs)
require.Nil(t, pErr, "error: %s", pErr)
qtTxn := qtReply.(*roachpb.QueryTxnResponse).QueriedTxn

if !newEpoch && !newTimestamp {
// The transaction was implicitly committed at its initial epoch and
// timestamp.
require.Equal(t, roachpb.COMMITTED, qtTxn.Status)
require.Equal(t, txn.Epoch, qtTxn.Epoch)
require.Equal(t, txn.WriteTimestamp, qtTxn.WriteTimestamp)
} else if newEpoch {
// The transaction is aborted if that's what the high-priority
// request wants. Otherwise, the transaction's record is bumped to
// the new epoch pulled from its intent and pushed above the
// high-priority request's timestamp.
if pushAbort {
require.Equal(t, roachpb.ABORTED, qtTxn.Status)
} else /* pushTimestamp */ {
require.Equal(t, roachpb.PENDING, qtTxn.Status)
require.Equal(t, txn.Epoch+1, qtTxn.Epoch)
require.Equal(t, conflictH.Timestamp.Next(), qtTxn.WriteTimestamp)
}
} else /* if newTimestamp */ {
// The transaction is aborted, even if the high-priority request
// only needed it to be pushed to a higher timestamp. This is
// because we don't allow a STAGING transaction record to move back
// to PENDING in the same epoch.
require.Equal(t, roachpb.ABORTED, qtTxn.Status)
}
}

testutils.RunTrueAndFalse(t, "push_abort", func(t *testing.T, pushAbort bool) {
testutils.RunTrueAndFalse(t, "new_epoch", func(t *testing.T, newEpoch bool) {
testutils.RunTrueAndFalse(t, "new_timestamp", func(t *testing.T, newTimestamp bool) {
run(t, pushAbort, newEpoch, newTimestamp)
})
})
})
}

// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
// write intents. This can cause it to remove an intent from an implicitly
// committed STAGING txn. When txn recovery kicks in, it will fail to find the
Expand Down

0 comments on commit f902821

Please sign in to comment.