Skip to content

Commit

Permalink
Merge pull request #118472 from erikgrinaker/backport22.2-117969
Browse files Browse the repository at this point in the history
release-22.2: batcheval: add `PushTxnResponse.AmbiguousAbort`
  • Loading branch information
erikgrinaker authored Feb 1, 2024
2 parents 9afce9d + 4f5ec22 commit 9301a16
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 43 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_test(
"cmd_get_test.go",
"cmd_is_span_empty_test.go",
"cmd_lease_test.go",
"cmd_push_txn_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
"cmd_recover_txn_test.go",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func PushTxn(
// then we know we're in either the second or the third case.
reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn)
if reply.PusheeTxn.Status == roachpb.ABORTED {
// The transaction may actually have committed and already removed its
// intents and txn record, or it may have aborted and done the same. We
// can't know, so mark the abort as ambiguous.
reply.AmbiguousAbort = true
// If the transaction is uncommittable, we don't even need to
// persist an ABORTED transaction record, we can just consider it
// aborted. This is good because it allows us to obey the invariant
Expand Down
103 changes: 103 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record
// is missing. In this case, the timestamp cache can tell us whether the
// transaction record may have existed in the past -- if we know it hasn't, then
// the transaction is still pending (e.g. before the record is written), but
// otherwise the transaction record is pessimistically assumed to have aborted.
// However, this state is ambiguous, as the transaction may in fact have
// committed already and GCed its transaction record. Make sure this is
// reflected in the AmbiguousAbort field.
//
// TODO(erikgrinaker): generalize this to test PushTxn more broadly.
func TestPushTxnAmbiguousAbort(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Now()), 0 /* maxOffset */)
now := clock.Now()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()

testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) {
evalCtx := (&batcheval.MockEvalCtx{
Clock: clock,
CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) {
return canCreateTxnRecord, hlc.Timestamp{}, 0 // PushTxn doesn't care about the reason
},
}).EvalContext()

key := roachpb.Key("foo")
pusheeTxnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(),
Key: key,
MinTimestamp: now,
}

resp := roachpb.PushTxnResponse{}
res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: clock.Now(),
},
Args: &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
PusheeTxn: pusheeTxnMeta,
},
}, &resp)
require.NoError(t, err)

// There is no txn record (the engine is empty). If we can't create a txn
// record, it's because the timestamp cache can't confirm that it didn't
// exist in the past. This will return an ambiguous abort.
var expectUpdatedTxns []*roachpb.Transaction
expectTxn := roachpb.Transaction{
TxnMeta: pusheeTxnMeta,
LastHeartbeat: pusheeTxnMeta.MinTimestamp,
}
if !canCreateTxnRecord {
expectTxn.Status = roachpb.ABORTED
expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn)
}

require.Equal(t, result.Result{
Local: result.LocalResult{
UpdatedTxns: expectUpdatedTxns,
},
}, res)
require.Equal(t, roachpb.PushTxnResponse{
PusheeTxn: expectTxn,
AmbiguousAbort: !canCreateTxnRecord,
}, resp)
})
}
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,22 +691,22 @@ func (c *cluster) makeConfig() concurrency.Config {
// PushTransaction implements the concurrency.IntentResolver interface.
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *roachpb.Error) {
) (*roachpb.Transaction, bool, *roachpb.Error) {
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
defer c.unregisterPush(push)
}
Expand All @@ -725,10 +725,10 @@ func (c *cluster) PushTransaction(
switch {
case pusheeTxn.Status.IsFinalized():
// Already finalized.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == roachpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == roachpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority):
Expand All @@ -748,16 +748,16 @@ func (c *cluster) PushTransaction(
err = errors.Errorf("unexpected push type: %s", pushType)
}
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
pusheeTxn, _ = pusheeRecord.asTxn()
return pusheeTxn, nil
return pusheeTxn, false, nil
}
// If PUSH_TOUCH, return error instead of waiting.
if pushType == roachpb.PUSH_TOUCH {
log.Eventf(ctx, "pushee not abandoned")
err := roachpb.NewTransactionPushError(*pusheeTxn)
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
// Or the pusher aborted?
var pusherRecordSig chan struct{}
Expand All @@ -767,15 +767,15 @@ func (c *cluster) PushTransaction(
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED)
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
return nil, false, roachpb.NewError(ctx.Err())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type IntentResolver interface {
// pushed successfully.
PushTransaction(
context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType,
) (*roachpb.Transaction, *Error)
) (*roachpb.Transaction, bool, *Error)

// ResolveIntent synchronously resolves the provided intent.
ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
Expand Down Expand Up @@ -516,7 +516,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
log.Fatalf(ctx, "unexpected WaitPolicy: %v", req.WaitPolicy)
}

pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
// If pushing with an Error WaitPolicy and the push fails, then the lock
// holder is still active. Transform the error into a WriteIntentError.
Expand Down Expand Up @@ -697,7 +697,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
pushType := roachpb.PUSH_ABORT
log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.ID.Short())

_, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
_, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ import (
)

type mockIntentResolver struct {
pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, *Error)
pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, bool, *Error)
resolveIntent func(context.Context, roachpb.LockUpdate) *Error
resolveIntents func(context.Context, []roachpb.LockUpdate) *Error
}

// mockIntentResolver implements the IntentResolver interface.
func (m *mockIntentResolver) PushTransaction(
ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
return m.pushTxn(ctx, txn, h, pushType)
}

Expand Down Expand Up @@ -350,7 +350,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
Expand Down Expand Up @@ -383,7 +383,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
g.state = waitingState{kind: doneWaiting}
g.notify()
}
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -535,15 +535,15 @@ func testErrorWaitPush(
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
require.Equal(t, roachpb.PUSH_TOUCH, pushType)

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, roachpb.NewError(&roachpb.TransactionPushError{
return nil, false, roachpb.NewError(&roachpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -564,7 +564,7 @@ func testErrorWaitPush(
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -707,7 +707,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)

Expand All @@ -720,7 +720,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

// Wait for the context to hit its timeout.
<-ctx.Done()
return nil, roachpb.NewError(ctx.Err())
return nil, false, roachpb.NewError(ctx.Err())
}

require.Equal(t, roachpb.PUSH_TOUCH, pushType)
Expand All @@ -730,7 +730,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, roachpb.NewError(&roachpb.TransactionPushError{
return nil, false, roachpb.NewError(&roachpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -751,7 +751,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -807,8 +807,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return nil, err1
) (*roachpb.Transaction, bool, *Error) {
return nil, false, err1
}
err := w.WaitOn(ctx, req, g)
require.Equal(t, err1, err)
Expand All @@ -818,8 +818,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return &pusheeTxn, nil
) (*roachpb.Transaction, bool, *Error) {
return &pusheeTxn, false, nil
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
return err2
Expand Down
Loading

0 comments on commit 9301a16

Please sign in to comment.