diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index a3c19c2a7af8..98d64f3cbae6 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -538,8 +538,16 @@ func (ir *IntentResolver) CleanupIntents( return resolved, nil } -// CleanupTxnIntentsAsync asynchronously cleans up intents owned by -// a transaction on completion. +// CleanupTxnIntentsAsync asynchronously cleans up intents owned by a +// transaction on completion. When all intents have been successfully resolved, +// the txn record is GC'ed. +// +// WARNING: Since this GCs the txn record, it should only be called in response +// to requests coming from the coordinator or the GC Queue. We don't want other +// actors to GC a txn record, since that can cause ambiguities for the +// coordinator: if it had STAGED the txn, it won't be able to tell the +// difference between a txn that had been implicitly committed, recovered, and +// GC'ed, and one that someone else aborted and GC'ed. func (ir *IntentResolver) CleanupTxnIntentsAsync( ctx context.Context, rangeID roachpb.RangeID, diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 3079743a5267..58c5632d089a 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -532,7 +532,7 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) { } close(e.syncC) default: - panic("missing event variant") + panic(fmt.Sprintf("missing event variant: %+v", e)) } } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index e4bf0062547b..18b78c780f76 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -675,25 +677,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { testNum++ switch testNum { case 1: - require.Equal(t, 3, len(txns)) - require.Equal(t, txn1Meta, txns[0]) - require.Equal(t, txn2Meta, txns[1]) - require.Equal(t, txn3Meta, txns[2]) + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1Meta, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push does not succeed. Protos not at larger ts. return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil case 2: - require.Equal(t, 3, len(txns)) - require.Equal(t, txn1MetaT2Pre, txns[0]) - require.Equal(t, txn2Meta, txns[1]) - require.Equal(t, txn3Meta, txns[2]) + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1MetaT2Pre, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push succeeds. Return new protos. return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil case 3: - require.Equal(t, 2, len(txns)) - require.Equal(t, txn2MetaT2Post, txns[0]) - require.Equal(t, txn3MetaT2Post, txns[1]) + assert.Equal(t, 2, len(txns)) + assert.Equal(t, txn2MetaT2Post, txns[0]) + assert.Equal(t, txn3MetaT2Post, txns[1]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push succeeds. Return new protos. return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil @@ -701,17 +712,12 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil, nil } }) - tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error { - switch testNum { - case 1: - require.Equal(t, 0, len(txns)) - case 2: - require.Equal(t, 1, len(txns)) - require.Equal(t, txn1ProtoT2, txns[0]) - case 3: - require.Equal(t, 1, len(txns)) - require.Equal(t, txn2ProtoT3, txns[0]) - default: + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + // There's nothing to assert here. We expect the intents to correspond to + // transactions that had their LockSpans populated when we pushed them. This + // test doesn't simulate that. + + if testNum > 3 { return nil } diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 7630089dbada..7cca67eb6a9f 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -117,9 +117,8 @@ type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new // timestamp. It returns the resulting transaction protos. PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) - // CleanupTxnIntentsAsync asynchronously cleans up intents owned - // by the specified transactions. - CleanupTxnIntentsAsync(context.Context, []*roachpb.Transaction) error + // ResolveIntents resolves the specified intents. + ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -173,10 +172,16 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { if err != nil { return err } + if len(pushedTxns) != len(a.txns) { + // We expect results for all txns. In particular, if no txns have been pushed, we'd + // crash later cause we'd be creating an invalid empty event. + return errors.AssertionFailedf("tried to push %d transactions, got response for %d", + len(a.txns), len(pushedTxns)) + } // Inform the Processor of the results of the push for each transaction. ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns)) - var toCleanup []*roachpb.Transaction + var intentsToCleanup []roachpb.LockUpdate for i, txn := range pushedTxns { switch txn.Status { case roachpb.PENDING, roachpb.STAGING: @@ -198,13 +203,12 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { Timestamp: txn.WriteTimestamp, }) - // Asynchronously clean up the transaction's intents, which should - // eventually cause all unresolved intents for this transaction on the - // rangefeed's range to be resolved. We'll have to wait until the - // intents are resolved before the resolved timestamp can advance past - // the transaction's commit timestamp, so the best we can do is help - // speed up the resolution. - toCleanup = append(toCleanup, txn) + // Clean up the transaction's intents, which should eventually cause all + // unresolved intents for this transaction on the rangefeed's range to be + // resolved. We'll have to wait until the intents are resolved before the + // resolved timestamp can advance past the transaction's commit timestamp, + // so the best we can do is help speed up the resolution. + intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...) case roachpb.ABORTED: // The transaction is aborted, so it doesn't need to be tracked // anymore nor does it need to prevent the resolved timestamp from @@ -219,19 +223,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { TxnID: txn.ID, }) - // While we're here, we might as well also clean up the transaction's - // intents so that no-one else needs to deal with them. However, it's - // likely that if our push caused the abort then the transaction's - // intents will be unknown and we won't be doing much good here. - toCleanup = append(toCleanup, txn) + // If the txn happens to have its LockSpans populated, then lets clean up + // the intents as an optimization helping others. If we aborted the txn, + // then it won't have this field populated. If, however, we ran into a + // transaction that its coordinator tried to rollback but didn't follow up + // with garbage collection, then LockSpans will be populated. + intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...) } } // Inform the processor of all logical ops. a.p.sendEvent(event{ops: ops}, 0 /* timeout */) - // Clean up txns, if necessary, - return a.p.TxnPusher.CleanupTxnIntentsAsync(ctx, toCleanup) + // Resolve intents, if necessary. + return a.p.TxnPusher.ResolveIntents(ctx, intentsToCleanup) } func (a *txnPushAttempt) Cancel() { diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 45fcd551edac..78ffd57d2823 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -249,8 +249,8 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) - cleanupTxnIntentsAsyncFn func([]*roachpb.Transaction) error + pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( @@ -259,10 +259,8 @@ func (tp *testTxnPusher) PushTxns( return tp.pushTxnsFn(txns, ts) } -func (tp *testTxnPusher) CleanupTxnIntentsAsync( - ctx context.Context, txns []*roachpb.Transaction, -) error { - return tp.cleanupTxnIntentsAsyncFn(txns) +func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { + return tp.resolveIntentsFn(ctx, intents) } func (tp *testTxnPusher) mockPushTxns( @@ -271,41 +269,79 @@ func (tp *testTxnPusher) mockPushTxns( tp.pushTxnsFn = fn } -func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]*roachpb.Transaction) error) { - tp.cleanupTxnIntentsAsyncFn = fn +func (tp *testTxnPusher) mockResolveIntentsFn( + fn func(context.Context, []roachpb.LockUpdate) error, +) { + tp.resolveIntentsFn = fn +} + +func (tp *testTxnPusher) intentsToTxns(intents []roachpb.LockUpdate) []enginepb.TxnMeta { + txns := make([]enginepb.TxnMeta, 0) + txnIDs := make(map[uuid.UUID]struct{}) + for _, intent := range intents { + txn := intent.Txn + if _, ok := txnIDs[txn.ID]; ok { + continue + } + txns = append(txns, txn) + txnIDs[txn.ID] = struct{}{} + } + return txns } func TestTxnPushAttempt(t *testing.T) { defer leaktest.AfterTest(t)() // Create a set of transactions. - txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() - ts1, ts2, ts3 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3} + txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() + ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4} + txn2LockSpans := []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + {Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + } + txn4LockSpans := []roachpb.Span{ + {Key: roachpb.Key("e"), EndKey: roachpb.Key("f")}, + {Key: roachpb.Key("g"), EndKey: roachpb.Key("h")}, + } txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1} txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2} txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts3} + txn4Meta := enginepb.TxnMeta{ID: txn4, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts4} txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING} - txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED} + txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED, LockSpans: txn2LockSpans} txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED} + // txn4 has its LockSpans populated, simulated a transaction that has been + // rolled back by its coordinator (which populated the LockSpans), but then + // not GC'ed for whatever reason. + txn4Proto := &roachpb.Transaction{TxnMeta: txn4Meta, Status: roachpb.ABORTED, LockSpans: txn4LockSpans} // Run a txnPushAttempt. var tp testTxnPusher tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { - require.Equal(t, 3, len(txns)) + require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) require.Equal(t, txn3Meta, txns[2]) + require.Equal(t, txn4Meta, txns[3]) require.Equal(t, hlc.Timestamp{WallTime: 15}, ts) - // Return all three protos. The PENDING txn is pushed. + // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil }) - tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error { + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + require.Len(t, intents, 4) + require.Equal(t, txn2LockSpans[0], intents[0].Span) + require.Equal(t, txn2LockSpans[1], intents[1].Span) + require.Equal(t, txn4LockSpans[0], intents[2].Span) + require.Equal(t, txn4LockSpans[1], intents[3].Span) + txns := tp.intentsToTxns(intents) require.Equal(t, 2, len(txns)) - require.Equal(t, txn2Proto, txns[0]) - require.Equal(t, txn3Proto, txns[1]) + require.Equal(t, txn2Meta, txns[0]) + // Note that we don't expect intents for txn3 to be resolved since that txn + // doesn't have its LockSpans populated. + require.Equal(t, txn4Meta, txns[1]) return nil }) @@ -313,7 +349,7 @@ func TestTxnPushAttempt(t *testing.T) { p := Processor{eventC: make(chan event, 100)} p.TxnPusher = &tp - txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta} + txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta} doneC := make(chan struct{}) pushAttempt := newTxnPushAttempt(&p, txns, hlc.Timestamp{WallTime: 15}, doneC) pushAttempt.Run(context.Background()) @@ -325,6 +361,7 @@ func TestTxnPushAttempt(t *testing.T) { updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}), updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}), abortTxnOp(txn3), + abortTxnOp(txn4), }}, } require.Equal(t, len(expEvents), len(p.eventC)) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e6298f70720a..1d85069a62b9 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -101,16 +100,14 @@ func (tp *rangefeedTxnPusher) PushTxns( return pushedTxns, nil } -// CleanupTxnIntentsAsync is part of the rangefeed.TxnPusher interface. -func (tp *rangefeedTxnPusher) CleanupTxnIntentsAsync( - ctx context.Context, txns []*roachpb.Transaction, +// ResolveIntents is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, ) error { - endTxns := make([]result.EndTxnIntents, len(txns)) - for i, txn := range txns { - endTxns[i].Txn = txn - endTxns[i].Poison = true - } - return tp.ir.CleanupTxnIntentsAsync(ctx, tp.r.RangeID, endTxns, true /* allowSyncProcessing */) + return tp.ir.ResolveIntents(ctx, intents, + // NB: Poison is ignored for non-ABORTED intents. + intentresolver.ResolveOptions{Poison: true}, + ).GoError() } type iteratorWithCloser struct {