Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: batcheval: add PushTxnResponse.AmbiguousAbort #118472

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_test(
"cmd_get_test.go",
"cmd_is_span_empty_test.go",
"cmd_lease_test.go",
"cmd_push_txn_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
"cmd_recover_txn_test.go",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func PushTxn(
// then we know we're in either the second or the third case.
reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn)
if reply.PusheeTxn.Status == roachpb.ABORTED {
// The transaction may actually have committed and already removed its
// intents and txn record, or it may have aborted and done the same. We
// can't know, so mark the abort as ambiguous.
reply.AmbiguousAbort = true
// If the transaction is uncommittable, we don't even need to
// persist an ABORTED transaction record, we can just consider it
// aborted. This is good because it allows us to obey the invariant
Expand Down
103 changes: 103 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record
// is missing. In this case, the timestamp cache can tell us whether the
// transaction record may have existed in the past -- if we know it hasn't, then
// the transaction is still pending (e.g. before the record is written), but
// otherwise the transaction record is pessimistically assumed to have aborted.
// However, this state is ambiguous, as the transaction may in fact have
// committed already and GCed its transaction record. Make sure this is
// reflected in the AmbiguousAbort field.
//
// TODO(erikgrinaker): generalize this to test PushTxn more broadly.
func TestPushTxnAmbiguousAbort(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Now()), 0 /* maxOffset */)
now := clock.Now()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()

testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) {
evalCtx := (&batcheval.MockEvalCtx{
Clock: clock,
CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) {
return canCreateTxnRecord, hlc.Timestamp{}, 0 // PushTxn doesn't care about the reason
},
}).EvalContext()

key := roachpb.Key("foo")
pusheeTxnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(),
Key: key,
MinTimestamp: now,
}

resp := roachpb.PushTxnResponse{}
res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: clock.Now(),
},
Args: &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
PusheeTxn: pusheeTxnMeta,
},
}, &resp)
require.NoError(t, err)

// There is no txn record (the engine is empty). If we can't create a txn
// record, it's because the timestamp cache can't confirm that it didn't
// exist in the past. This will return an ambiguous abort.
var expectUpdatedTxns []*roachpb.Transaction
expectTxn := roachpb.Transaction{
TxnMeta: pusheeTxnMeta,
LastHeartbeat: pusheeTxnMeta.MinTimestamp,
}
if !canCreateTxnRecord {
expectTxn.Status = roachpb.ABORTED
expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn)
}

require.Equal(t, result.Result{
Local: result.LocalResult{
UpdatedTxns: expectUpdatedTxns,
},
}, res)
require.Equal(t, roachpb.PushTxnResponse{
PusheeTxn: expectTxn,
AmbiguousAbort: !canCreateTxnRecord,
}, resp)
})
}
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,22 +691,22 @@ func (c *cluster) makeConfig() concurrency.Config {
// PushTransaction implements the concurrency.IntentResolver interface.
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *roachpb.Error) {
) (*roachpb.Transaction, bool, *roachpb.Error) {
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
defer c.unregisterPush(push)
}
Expand All @@ -725,10 +725,10 @@ func (c *cluster) PushTransaction(
switch {
case pusheeTxn.Status.IsFinalized():
// Already finalized.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == roachpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == roachpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority):
Expand All @@ -748,16 +748,16 @@ func (c *cluster) PushTransaction(
err = errors.Errorf("unexpected push type: %s", pushType)
}
if err != nil {
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
pusheeTxn, _ = pusheeRecord.asTxn()
return pusheeTxn, nil
return pusheeTxn, false, nil
}
// If PUSH_TOUCH, return error instead of waiting.
if pushType == roachpb.PUSH_TOUCH {
log.Eventf(ctx, "pushee not abandoned")
err := roachpb.NewTransactionPushError(*pusheeTxn)
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
// Or the pusher aborted?
var pusherRecordSig chan struct{}
Expand All @@ -767,15 +767,15 @@ func (c *cluster) PushTransaction(
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED)
return nil, roachpb.NewError(err)
return nil, false, roachpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
return nil, false, roachpb.NewError(ctx.Err())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type IntentResolver interface {
// pushed successfully.
PushTransaction(
context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType,
) (*roachpb.Transaction, *Error)
) (*roachpb.Transaction, bool, *Error)

// ResolveIntent synchronously resolves the provided intent.
ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
Expand Down Expand Up @@ -516,7 +516,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
log.Fatalf(ctx, "unexpected WaitPolicy: %v", req.WaitPolicy)
}

pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
// If pushing with an Error WaitPolicy and the push fails, then the lock
// holder is still active. Transform the error into a WriteIntentError.
Expand Down Expand Up @@ -697,7 +697,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
pushType := roachpb.PUSH_ABORT
log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.ID.Short())

_, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
_, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ import (
)

type mockIntentResolver struct {
pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, *Error)
pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, bool, *Error)
resolveIntent func(context.Context, roachpb.LockUpdate) *Error
resolveIntents func(context.Context, []roachpb.LockUpdate) *Error
}

// mockIntentResolver implements the IntentResolver interface.
func (m *mockIntentResolver) PushTransaction(
ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
return m.pushTxn(ctx, txn, h, pushType)
}

Expand Down Expand Up @@ -350,7 +350,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
Expand Down Expand Up @@ -383,7 +383,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
g.state = waitingState{kind: doneWaiting}
g.notify()
}
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -535,15 +535,15 @@ func testErrorWaitPush(
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
require.Equal(t, roachpb.PUSH_TOUCH, pushType)

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, roachpb.NewError(&roachpb.TransactionPushError{
return nil, false, roachpb.NewError(&roachpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -564,7 +564,7 @@ func testErrorWaitPush(
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -707,7 +707,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)

Expand All @@ -720,7 +720,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

// Wait for the context to hit its timeout.
<-ctx.Done()
return nil, roachpb.NewError(ctx.Err())
return nil, false, roachpb.NewError(ctx.Err())
}

require.Equal(t, roachpb.PUSH_TOUCH, pushType)
Expand All @@ -730,7 +730,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, roachpb.NewError(&roachpb.TransactionPushError{
return nil, false, roachpb.NewError(&roachpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -751,7 +751,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -807,8 +807,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return nil, err1
) (*roachpb.Transaction, bool, *Error) {
return nil, false, err1
}
err := w.WaitOn(ctx, req, g)
require.Equal(t, err1, err)
Expand All @@ -818,8 +818,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return &pusheeTxn, nil
) (*roachpb.Transaction, bool, *Error) {
return &pusheeTxn, false, nil
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
return err2
Expand Down
Loading