Skip to content

Commit

Permalink
storage: support no-op PUSH_TIMESTAMP pushes on STAGING transactions
Browse files Browse the repository at this point in the history
Caught while manually testing parallel commits last week.

PR cockroachdb#35763 made it an error to resolve an intent with a STAGING status.
This isn't a crazy thing to do in every circumstance though. The case
where this comes up is a PUSH_TIMESTAMP push on a STAGING transaction
whose timestamp is already sufficiently high. In this case, the pusher
can move the intent up out of its way without modifying the transaction
record or interfering with the parallel commit.

Concretely, this is allowed in pushes that hit this case:
https://github.com/cockroachdb/cockroach/blob/4ab679d978a8f566c1427b372547380ee012292f/pkg/storage/batcheval/cmd_push_txn.go#L197

To test this, the commit extends `TestStoreResolveWriteIntentPushOnRead`
to test scenarios in two different dimensions: PENDING vs. STAGING
transaction records and already pushed txns vs. not already pushed
txns. To test the latter dimensions, the commit had to refine how
far into the future transactions push conflicting intents. Previously,
transactions would push them all the way to hlc.Now() on the pushing
node so that there was no chance that they would be in their uncertainty
window after the push. This was pessimistic. The transaction only needs
to push the conflicting intent up to its observed timestamp for the
pushing node, which may be significantly lower than the current timestamp
on the pushing node. This should increase the number of no-op pushes we
see in the wild.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 1, 2019
1 parent 3f988b1 commit f78c17a
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 109 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
}
return 0, nil
},
TxnWait: txnwait.TestingKnobs{
TxnWaitKnobs: txnwait.TestingKnobs{
OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) {
// We'll trap a reader entering the wait queue for our txn.
v := txnID.Load()
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -81,9 +80,6 @@ func ResolveIntent(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Status == roachpb.STAGING {
return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status")
}

intent := roachpb.Intent{
Span: args.Span(),
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/pkg/errors"
)

func init() {
Expand All @@ -46,9 +45,6 @@ func ResolveIntentRange(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Status == roachpb.STAGING {
return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status")
}

intent := roachpb.Intent{
Span: args.Span(),
Expand Down
20 changes: 10 additions & 10 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ func mvccResolveWriteIntent(
// | restart | |
// | write@2 | |
// | | resolve@1 |
// ============================
// =============================
//
// In this case, if we required the epochs to match, we would not push the
// intent forward, and client B would upon retrying after its successful
Expand All @@ -2323,9 +2323,15 @@ func mvccResolveWriteIntent(
// used for resolving), but that costs latency.
// TODO(tschottdorf): various epoch-related scenarios here deserve more
// testing.
pushed := intent.Status == roachpb.PENDING &&
hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp) &&
meta.Txn.Epoch >= intent.Txn.Epoch
inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch
pushed := inProgress && hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp)

// There's nothing to do if meta's epoch is greater than or equal txn's
// epoch and the state is still in progress but the intent was not pushed
// to a larger timestamp.
if inProgress && !pushed {
return false, nil
}

// If we're committing, or if the commit timestamp of the intent has been moved forward, and if
// the proposed epoch matches the existing epoch: update the meta.Txn. For commit, it's set to
Expand Down Expand Up @@ -2420,12 +2426,6 @@ func mvccResolveWriteIntent(
// - writer2 dispatches ResolveIntent to key0 (with epoch 0)
// - ResolveIntent with epoch 0 aborts intent from epoch 1.

// There's nothing to do if meta's epoch is greater than or equal txn's epoch
// and the state is still PENDING.
if intent.Status == roachpb.PENDING && meta.Txn.Epoch >= intent.Txn.Epoch {
return false, nil
}

// First clear the intent value.
latestKey := MVCCKey{Key: intent.Key, Timestamp: hlc.Timestamp(meta.Timestamp)}
if err := engine.Clear(latestKey); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,14 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
var toCleanup []roachpb.Transaction
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING:
// The transaction is still pending but its timestamp was moved
case roachpb.PENDING, roachpb.STAGING:
// The transaction is still in progress but its timestamp was moved
// forward to the current time. Inform the Processor that it can
// forward the txn's timestamp in its unresolvedIntentQueue.
ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{
TxnID: txn.ID,
Timestamp: txn.Timestamp,
})
case roachpb.STAGING:
log.Fatalf(ctx, "unexpected pushed txn with STAGING status: %v", txn)
case roachpb.COMMITTED:
// The transaction is committed and its timestamp may have moved
// forward since we last saw an intent. Inform the Processor
Expand Down
33 changes: 21 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2949,6 +2949,9 @@ func (s *Store) Send(
pErr = nil

case *roachpb.IndeterminateCommitError:
if s.cfg.TestingKnobs.DontRecoverIndeterminateCommits {
return nil, pErr
}
// On an indeterminate commit error, attempt to recover and finalize
// the stuck transaction. Retry immediately if successful.
if _, err := s.recoveryMgr.ResolveIndeterminateCommit(ctx, t); err != nil {
Expand Down Expand Up @@ -2980,18 +2983,24 @@ func (s *Store) Send(
// Make a copy of the header for the upcoming push; we will update
// the timestamp.
h := ba.Header
// We must push at least to h.Timestamp, but in fact we want to
// go all the way up to a timestamp which was taken off the HLC
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
h.Timestamp.Forward(now)
// We are going to hand the header (and thus the transaction proto)
// to the RPC framework, after which it must not be changed (since
// that could race). Since the subsequent execution of the original
// request might mutate the transaction, make a copy here.
//
// See #9130.
if h.Txn != nil {
// We must push at least to h.Timestamp, but in fact we want to
// go all the way up to a timestamp which was taken off the HLC
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
obsTS, ok := h.Txn.GetObservedTimestamp(ba.Replica.NodeID)
if !ok {
// This was set earlier in this method, so it's
// completely unexpected to not be found now.
log.Fatalf(ctx, "missing observed timestamp: %+v", h.Txn)
}
h.Timestamp.Forward(obsTS)
// We are going to hand the header (and thus the transaction proto)
// to the RPC framework, after which it must not be changed (since
// that could race). Since the subsequent execution of the original
// request might mutate the transaction, make a copy here.
//
// See #9130.
h.Txn = h.Txn.Clone()
}
// Handle the case where we get more than one write intent error;
Expand Down Expand Up @@ -4389,7 +4398,7 @@ func (s *Store) setScannerActive(active bool) {

// GetTxnWaitKnobs is part of txnwait.StoreInterface.
func (s *Store) GetTxnWaitKnobs() txnwait.TestingKnobs {
return s.TestingKnobs().TxnWait
return s.TestingKnobs().TxnWaitKnobs
}

// GetTxnWaitMetrics is called by txnwait.Queue instances to get a reference to
Expand Down
Loading

0 comments on commit f78c17a

Please sign in to comment.