From 4f5ec229c232c968b1e3d1f6c1b52941f562ab38 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 16 Jan 2024 12:30:42 +0000 Subject: [PATCH] batcheval: add `PushTxnResponse.AmbiguousAbort` This indicates to the caller that the `ABORTED` status of the pushed transaction is ambiguous, and the transaction may in fact have been committed and GCed already. This information is also plumbed through the `IntentResolver` txn push APIs. Epic: none Release note: None --- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_push_txn.go | 4 + .../kvserver/batcheval/cmd_push_txn_test.go | 103 ++++++++++++++++++ .../concurrency/concurrency_manager_test.go | 22 ++-- .../kvserver/concurrency/lock_table_waiter.go | 6 +- .../concurrency/lock_table_waiter_test.go | 30 ++--- .../intentresolver/intent_resolver.go | 37 ++++--- pkg/kv/kvserver/replica_rangefeed.go | 2 +- pkg/roachpb/api.proto | 9 ++ 9 files changed, 171 insertions(+), 43 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_push_txn_test.go diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index e9daf4480e6a..5d767c46429f 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -108,6 +108,7 @@ go_test( "cmd_get_test.go", "cmd_is_span_empty_test.go", "cmd_lease_test.go", + "cmd_push_txn_test.go", "cmd_query_intent_test.go", "cmd_query_resolved_timestamp_test.go", "cmd_recover_txn_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index cc99f4c72281..388ccfb9010c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -169,6 +169,10 @@ func PushTxn( // then we know we're in either the second or the third case. reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn) if reply.PusheeTxn.Status == roachpb.ABORTED { + // The transaction may actually have committed and already removed its + // intents and txn record, or it may have aborted and done the same. We + // can't know, so mark the abort as ambiguous. + reply.AmbiguousAbort = true // If the transaction is uncommittable, we don't even need to // persist an ABORTED transaction record, we can just consider it // aborted. This is good because it allows us to obey the invariant diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go b/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go new file mode 100644 index 000000000000..12862fb8409f --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go @@ -0,0 +1,103 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record +// is missing. In this case, the timestamp cache can tell us whether the +// transaction record may have existed in the past -- if we know it hasn't, then +// the transaction is still pending (e.g. before the record is written), but +// otherwise the transaction record is pessimistically assumed to have aborted. +// However, this state is ambiguous, as the transaction may in fact have +// committed already and GCed its transaction record. Make sure this is +// reflected in the AmbiguousAbort field. +// +// TODO(erikgrinaker): generalize this to test PushTxn more broadly. +func TestPushTxnAmbiguousAbort(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Now()), 0 /* maxOffset */) + now := clock.Now() + engine := storage.NewDefaultInMemForTesting() + defer engine.Close() + + testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) { + evalCtx := (&batcheval.MockEvalCtx{ + Clock: clock, + CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) { + return canCreateTxnRecord, hlc.Timestamp{}, 0 // PushTxn doesn't care about the reason + }, + }).EvalContext() + + key := roachpb.Key("foo") + pusheeTxnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), + Key: key, + MinTimestamp: now, + } + + resp := roachpb.PushTxnResponse{} + res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Header: roachpb.Header{ + Timestamp: clock.Now(), + }, + Args: &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{Key: key}, + PusheeTxn: pusheeTxnMeta, + }, + }, &resp) + require.NoError(t, err) + + // There is no txn record (the engine is empty). If we can't create a txn + // record, it's because the timestamp cache can't confirm that it didn't + // exist in the past. This will return an ambiguous abort. + var expectUpdatedTxns []*roachpb.Transaction + expectTxn := roachpb.Transaction{ + TxnMeta: pusheeTxnMeta, + LastHeartbeat: pusheeTxnMeta.MinTimestamp, + } + if !canCreateTxnRecord { + expectTxn.Status = roachpb.ABORTED + expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn) + } + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + UpdatedTxns: expectUpdatedTxns, + }, + }, res) + require.Equal(t, roachpb.PushTxnResponse{ + PusheeTxn: expectTxn, + AmbiguousAbort: !canCreateTxnRecord, + }, resp) + }) +} diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 0f731795b7d2..2faee795abca 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -691,22 +691,22 @@ func (c *cluster) makeConfig() concurrency.Config { // PushTransaction implements the concurrency.IntentResolver interface. func (c *cluster) PushTransaction( ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, -) (*roachpb.Transaction, *roachpb.Error) { +) (*roachpb.Transaction, bool, *roachpb.Error) { pusheeRecord, err := c.getTxnRecord(pushee.ID) if err != nil { - return nil, roachpb.NewError(err) + return nil, false, roachpb.NewError(err) } var pusherRecord *txnRecord if h.Txn != nil { pusherID := h.Txn.ID pusherRecord, err = c.getTxnRecord(pusherID) if err != nil { - return nil, roachpb.NewError(err) + return nil, false, roachpb.NewError(err) } push, err := c.registerPush(ctx, pusherID, pushee.ID) if err != nil { - return nil, roachpb.NewError(err) + return nil, false, roachpb.NewError(err) } defer c.unregisterPush(push) } @@ -725,10 +725,10 @@ func (c *cluster) PushTransaction( switch { case pusheeTxn.Status.IsFinalized(): // Already finalized. - return pusheeTxn, nil + return pusheeTxn, false, nil case pushType == roachpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp): // Already pushed. - return pusheeTxn, nil + return pusheeTxn, false, nil case pushType == roachpb.PUSH_TOUCH: pusherWins = false case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority): @@ -748,16 +748,16 @@ func (c *cluster) PushTransaction( err = errors.Errorf("unexpected push type: %s", pushType) } if err != nil { - return nil, roachpb.NewError(err) + return nil, false, roachpb.NewError(err) } pusheeTxn, _ = pusheeRecord.asTxn() - return pusheeTxn, nil + return pusheeTxn, false, nil } // If PUSH_TOUCH, return error instead of waiting. if pushType == roachpb.PUSH_TOUCH { log.Eventf(ctx, "pushee not abandoned") err := roachpb.NewTransactionPushError(*pusheeTxn) - return nil, roachpb.NewError(err) + return nil, false, roachpb.NewError(err) } // Or the pusher aborted? var pusherRecordSig chan struct{} @@ -767,7 +767,7 @@ func (c *cluster) PushTransaction( if pusherTxn.Status == roachpb.ABORTED { log.Eventf(ctx, "detected pusher aborted") err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED) - return nil, roachpb.NewError(err) + return nil, false, roachpb.NewError(err) } } // Wait until either record is updated. @@ -775,7 +775,7 @@ func (c *cluster) PushTransaction( case <-pusheeRecordSig: case <-pusherRecordSig: case <-ctx.Done(): - return nil, roachpb.NewError(ctx.Err()) + return nil, false, roachpb.NewError(ctx.Err()) } } } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 82933f2a6c21..6f027a4e8e66 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -122,7 +122,7 @@ type IntentResolver interface { // pushed successfully. PushTransaction( context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType, - ) (*roachpb.Transaction, *Error) + ) (*roachpb.Transaction, bool, *Error) // ResolveIntent synchronously resolves the provided intent. ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error @@ -516,7 +516,7 @@ func (w *lockTableWaiterImpl) pushLockTxn( log.Fatalf(ctx, "unexpected WaitPolicy: %v", req.WaitPolicy) } - pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { // If pushing with an Error WaitPolicy and the push fails, then the lock // holder is still active. Transform the error into a WriteIntentError. @@ -697,7 +697,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn( pushType := roachpb.PUSH_ABORT log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.ID.Short()) - _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + _, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { return err } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 1a6bb48a6693..8d834437dae3 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -37,7 +37,7 @@ import ( ) type mockIntentResolver struct { - pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, *Error) + pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, bool, *Error) resolveIntent func(context.Context, roachpb.LockUpdate) *Error resolveIntents func(context.Context, []roachpb.LockUpdate) *Error } @@ -45,7 +45,7 @@ type mockIntentResolver struct { // mockIntentResolver implements the IntentResolver interface. func (m *mockIntentResolver) PushTransaction( ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, -) (*roachpb.Transaction, *Error) { +) (*roachpb.Transaction, bool, *Error) { return m.pushTxn(ctx, txn, h, pushType) } @@ -350,7 +350,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl pusheeArg *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) require.Equal(t, expPushTS, h.Timestamp) @@ -383,7 +383,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl g.state = waitingState{kind: doneWaiting} g.notify() } - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -535,7 +535,7 @@ func testErrorWaitPush( pusheeArg *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) require.Equal(t, expPushTS, h.Timestamp) @@ -543,7 +543,7 @@ func testErrorWaitPush( resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING} if pusheeActive { - return nil, roachpb.NewError(&roachpb.TransactionPushError{ + return nil, false, roachpb.NewError(&roachpb.TransactionPushError{ PusheeTxn: *resp, }) } @@ -564,7 +564,7 @@ func testErrorWaitPush( return nil } resp.Status = roachpb.ABORTED - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -707,7 +707,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { pusheeArg *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) @@ -720,7 +720,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { // Wait for the context to hit its timeout. <-ctx.Done() - return nil, roachpb.NewError(ctx.Err()) + return nil, false, roachpb.NewError(ctx.Err()) } require.Equal(t, roachpb.PUSH_TOUCH, pushType) @@ -730,7 +730,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING} if pusheeActive { - return nil, roachpb.NewError(&roachpb.TransactionPushError{ + return nil, false, roachpb.NewError(&roachpb.TransactionPushError{ PusheeTxn: *resp, }) } @@ -751,7 +751,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { return nil } resp.Status = roachpb.ABORTED - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -807,8 +807,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { g.notify() ir.pushTxn = func( _ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { - return nil, err1 + ) (*roachpb.Transaction, bool, *Error) { + return nil, false, err1 } err := w.WaitOn(ctx, req, g) require.Equal(t, err1, err) @@ -818,8 +818,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { g.notify() ir.pushTxn = func( _ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { - return &pusheeTxn, nil + ) (*roachpb.Transaction, bool, *Error) { + return &pusheeTxn, false, nil } ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error { return err2 diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index f7032e486158..a1e96c6e7f2a 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -291,21 +291,26 @@ func updateIntentTxnStatus( // PushTransaction takes a transaction and pushes its record using the specified // push type and request header. It returns the transaction proto corresponding -// to the pushed transaction. +// to the pushed transaction, and in the case of an ABORTED transaction, a bool +// indicating whether the abort was ambiguous (see +// PushTxnResponse.AmbiguousAbort). +// +// NB: ambiguousAbort may be false with nodes <24.1. func (ir *IntentResolver) PushTransaction( ctx context.Context, pushTxn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, -) (*roachpb.Transaction, *roachpb.Error) { +) (_ *roachpb.Transaction, ambiguousAbort bool, _ *roachpb.Error) { pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta, 1) pushTxns[pushTxn.ID] = pushTxn - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, false /* skipIfInFlight */) + pushedTxns, ambiguousAbort, pErr := ir.MaybePushTransactions( + ctx, pushTxns, h, pushType, false /* skipIfInFlight */) if pErr != nil { - return nil, pErr + return nil, false, pErr } pushedTxn, ok := pushedTxns[pushTxn.ID] if !ok { log.Fatalf(ctx, "missing PushTxn responses for %s", pushTxn) } - return pushedTxn, nil + return pushedTxn, ambiguousAbort, nil } // MaybePushTransactions tries to push the conflicting transaction(s): @@ -313,8 +318,12 @@ func (ir *IntentResolver) PushTransaction( // it on a write/write conflict, or doing nothing if the transaction is no // longer pending. // -// Returns a set of transaction protos who correspond to the pushed -// transactions and whose intents can now be resolved, and an error. +// Returns a set of transaction protos who correspond to the pushed transactions +// and whose intents can now be resolved, along with a bool indicating whether +// any of the responses were an ambiguous abort (see +// PushTxnResponse.AmbiguousAbort), and an error. +// +// NB: anyAmbiguousAbort may be false with nodes <24.1. // // If skipIfInFlight is true, then no PushTxns will be sent and no intents // will be returned for any transaction for which there is another push in @@ -339,7 +348,7 @@ func (ir *IntentResolver) MaybePushTransactions( h roachpb.Header, pushType roachpb.PushTxnType, skipIfInFlight bool, -) (map[uuid.UUID]*roachpb.Transaction, *roachpb.Error) { +) (_ map[uuid.UUID]*roachpb.Transaction, anyAmbiguousAbort bool, _ *roachpb.Error) { // Decide which transactions to push and which to ignore because // of other in-flight requests. For those transactions that we // will be pushing, increment their ref count in the in-flight @@ -370,7 +379,7 @@ func (ir *IntentResolver) MaybePushTransactions( } ir.mu.Unlock() if len(pushTxns) == 0 { - return nil, nil + return nil, false, nil } pusherTxn := getPusherTxn(h) @@ -395,7 +404,7 @@ func (ir *IntentResolver) MaybePushTransactions( err := ir.db.Run(ctx, b) cleanupInFlightPushes() if err != nil { - return nil, b.MustPErr() + return nil, false, b.MustPErr() } // TODO(nvanbenschoten): if we succeed because the transaction has already @@ -405,14 +414,16 @@ func (ir *IntentResolver) MaybePushTransactions( br := b.RawResponse() pushedTxns := make(map[uuid.UUID]*roachpb.Transaction, len(br.Responses)) for _, resp := range br.Responses { - txn := &resp.GetInner().(*roachpb.PushTxnResponse).PusheeTxn + resp := resp.GetInner().(*roachpb.PushTxnResponse) + txn := &resp.PusheeTxn + anyAmbiguousAbort = anyAmbiguousAbort || resp.AmbiguousAbort if _, ok := pushedTxns[txn.ID]; ok { log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID) } pushedTxns[txn.ID] = txn log.Eventf(ctx, "%s is now %s", txn.ID, txn.Status) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // runAsyncTask semi-synchronously runs a generic task function. If @@ -516,7 +527,7 @@ func (ir *IntentResolver) CleanupIntents( } } - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) + pushedTxns, _, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) if pErr != nil { return 0, errors.Wrapf(pErr.GoError(), "failed to push during intent resolution") } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 23a1f8fe2aeb..fdfb1660151c 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -118,7 +118,7 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, roachpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil { diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b8262a10c681..269e87a9d0a9 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1095,6 +1095,15 @@ message PushTxnResponse { // TODO(tschottdorf): Maybe this can be a TxnMeta instead; probably requires // factoring out the new Priority. Transaction pushee_txn = 2 [(gogoproto.nullable) = false]; + // ambiguous_abort is true if pushee_txn has status ABORTED, but the + // transaction may in fact have been committed and GCed already. Concretely, + // this means that the transaction record does not exist, but it may have + // existed in the past (according to the timestamp cache), and we can't know + // whether it committed or aborted so we pessimistically assume it aborted. + // + // NB: this field was added in a patch release, and is not guaranteed to be + // populated prior to 24.1. + bool ambiguous_abort = 3; } // A RecoverTxnRequest is arguments to the RecoverTxn() method. It is sent