From 5c62f6a5ba12e71c8a3a2665a1ea8fd0d759516e Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 20 Aug 2020 11:04:06 -0400 Subject: [PATCH] rangefeed: don't delete txn records from the rngfeed pusher Before this patch, the rangefeed txn pushed was GC'ing the records of the transactions that it found to be aborted or committed when it PUSH_TOUCHED's them. Deleting the txn record is not a nice thing to do if the txn's coordinator might still be running, and so this patch terminates the practice. If the coordinator is still running and it had STAGED the txn record, a PUSH_TOUCH might cause a transition to ABORTED or COMMITTED. If such a transition is followed by the GC of the txn record, this causes an ambigous commit error for the coordinator, which can't tell whether the transaction committed (implicitly) or was rolled back. As the code stands, this patch doesn't actually help eliminate the ambiguity in the ABORTED case (it does in the COMMIT case though), because the coordinator doesn't distinguish between an ABORTED record and a missing record; it treats both the same, as ambiguous. But we're going to improve that, and the coordinator is gonna consider an existing record as non-ambiguous. The GC'ed case is going to remain the ambiguous one, and the idea is to avoid that as much as possible by only doing GC from the gc-queue (after an hour) and from the coordinator. Touches #52566 Release note (bug fix): Some rare AmbiguousCommitErrors happening when CDC was used were eliminated. --- .../intentresolver/intent_resolver.go | 12 ++- pkg/kv/kvserver/rangefeed/processor.go | 2 +- pkg/kv/kvserver/rangefeed/processor_test.go | 50 +++++++------ pkg/kv/kvserver/rangefeed/task.go | 30 +++++--- pkg/kv/kvserver/rangefeed/task_test.go | 73 ++++++++++++++----- pkg/kv/kvserver/replica_rangefeed.go | 14 +--- 6 files changed, 116 insertions(+), 65 deletions(-) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index 340cbd454174..c2e3ea831f87 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -538,8 +538,16 @@ func (ir *IntentResolver) CleanupIntents( return resolved, nil } -// CleanupTxnIntentsAsync asynchronously cleans up intents owned by -// a transaction on completion. +// CleanupTxnIntentsAsync asynchronously cleans up intents owned by a +// transaction on completion. When all intents have been successfully resolved, +// the txn record is GC'ed. +// +// WARNING: Since this GCs the txn record, it should only be called in response +// to requests coming from the coordinator or the GC Queue. We don't want other +// actors to GC a txn record, since that can cause ambiguities for the +// coordinator: if it had STAGED the txn, it won't be able to tell the +// difference between a txn that had been implicitly committed and one that +// someone else aborted. func (ir *IntentResolver) CleanupTxnIntentsAsync( ctx context.Context, rangeID roachpb.RangeID, diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 3079743a5267..58c5632d089a 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -532,7 +532,7 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) { } close(e.syncC) default: - panic("missing event variant") + panic(fmt.Sprintf("missing event variant: %+v", e)) } } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index e4bf0062547b..d9184f79b508 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -675,25 +677,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { testNum++ switch testNum { case 1: - require.Equal(t, 3, len(txns)) - require.Equal(t, txn1Meta, txns[0]) - require.Equal(t, txn2Meta, txns[1]) - require.Equal(t, txn3Meta, txns[2]) + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1Meta, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push does not succeed. Protos not at larger ts. return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil case 2: - require.Equal(t, 3, len(txns)) - require.Equal(t, txn1MetaT2Pre, txns[0]) - require.Equal(t, txn2Meta, txns[1]) - require.Equal(t, txn3Meta, txns[2]) + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1MetaT2Pre, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push succeeds. Return new protos. return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil case 3: - require.Equal(t, 2, len(txns)) - require.Equal(t, txn2MetaT2Post, txns[0]) - require.Equal(t, txn3MetaT2Post, txns[1]) + assert.Equal(t, 2, len(txns)) + assert.Equal(t, txn2MetaT2Post, txns[0]) + assert.Equal(t, txn3MetaT2Post, txns[1]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push succeeds. Return new protos. return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil @@ -701,17 +712,12 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil, nil } }) - tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error { - switch testNum { - case 1: - require.Equal(t, 0, len(txns)) - case 2: - require.Equal(t, 1, len(txns)) - require.Equal(t, txn1ProtoT2, txns[0]) - case 3: - require.Equal(t, 1, len(txns)) - require.Equal(t, txn2ProtoT3, txns[0]) - default: + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + // There's nothing to assert here. We expect the intents to correspond to + // transactions that had theit LockSpans populated when we pushed them. This + // test doesn't simulate that. + + if testNum > 3 { return nil } diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 7630089dbada..9d93e197d466 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -117,9 +117,8 @@ type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new // timestamp. It returns the resulting transaction protos. PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) - // CleanupTxnIntentsAsync asynchronously cleans up intents owned - // by the specified transactions. - CleanupTxnIntentsAsync(context.Context, []*roachpb.Transaction) error + // ResolveIntents resolves the specified intents. + ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -173,10 +172,16 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { if err != nil { return err } + if len(pushedTxns) != len(a.txns) { + // We expect results for all txns. In particular, if no txns have been pushed, we'd + // crash later cause we'd be creating an invalid empty event. + return errors.AssertionFailedf("tried to push %d transactions, got response for %d", + len(a.txns), len(pushedTxns)) + } // Inform the Processor of the results of the push for each transaction. ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns)) - var toCleanup []*roachpb.Transaction + var intentsToCleanup []roachpb.LockUpdate for i, txn := range pushedTxns { switch txn.Status { case roachpb.PENDING, roachpb.STAGING: @@ -204,7 +209,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { // intents are resolved before the resolved timestamp can advance past // the transaction's commit timestamp, so the best we can do is help // speed up the resolution. - toCleanup = append(toCleanup, txn) + intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...) case roachpb.ABORTED: // The transaction is aborted, so it doesn't need to be tracked // anymore nor does it need to prevent the resolved timestamp from @@ -219,19 +224,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { TxnID: txn.ID, }) - // While we're here, we might as well also clean up the transaction's - // intents so that no-one else needs to deal with them. However, it's - // likely that if our push caused the abort then the transaction's - // intents will be unknown and we won't be doing much good here. - toCleanup = append(toCleanup, txn) + // If the txn happens to have its LockSpans populated, then lets clean up + // the intents. If we aborted the txn, then it won't have this field + // populated. If, however, we ran into a transaction that its coordinator + // tried to rollback but didn't follow up with garbage collection, then + // LockSpans will be populated. + intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...) } } // Inform the processor of all logical ops. a.p.sendEvent(event{ops: ops}, 0 /* timeout */) - // Clean up txns, if necessary, - return a.p.TxnPusher.CleanupTxnIntentsAsync(ctx, toCleanup) + // Resolve intents, if necessary. + return a.p.TxnPusher.ResolveIntents(ctx, intentsToCleanup) } func (a *txnPushAttempt) Cancel() { diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 45fcd551edac..adfab44533ac 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -249,8 +249,8 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) - cleanupTxnIntentsAsyncFn func([]*roachpb.Transaction) error + pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( @@ -259,10 +259,8 @@ func (tp *testTxnPusher) PushTxns( return tp.pushTxnsFn(txns, ts) } -func (tp *testTxnPusher) CleanupTxnIntentsAsync( - ctx context.Context, txns []*roachpb.Transaction, -) error { - return tp.cleanupTxnIntentsAsyncFn(txns) +func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { + return tp.resolveIntentsFn(ctx, intents) } func (tp *testTxnPusher) mockPushTxns( @@ -271,41 +269,79 @@ func (tp *testTxnPusher) mockPushTxns( tp.pushTxnsFn = fn } -func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]*roachpb.Transaction) error) { - tp.cleanupTxnIntentsAsyncFn = fn +func (tp *testTxnPusher) intentsToTxns(intents []roachpb.LockUpdate) []enginepb.TxnMeta { + txns := make([]enginepb.TxnMeta, 0) + txnIDs := make(map[uuid.UUID]struct{}) + for _, intent := range intents { + txn := intent.Txn + if _, ok := txnIDs[txn.ID]; ok { + continue + } + txns = append(txns, txn) + txnIDs[txn.ID] = struct{}{} + } + return txns +} + +func (tp *testTxnPusher) mockResolveIntentsFn( + fn func(context.Context, []roachpb.LockUpdate) error, +) { + tp.resolveIntentsFn = fn } func TestTxnPushAttempt(t *testing.T) { defer leaktest.AfterTest(t)() // Create a set of transactions. - txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() - ts1, ts2, ts3 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3} + txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() + ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4} + txn2LockSpans := []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + {Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + } + txn4LockSpans := []roachpb.Span{ + {Key: roachpb.Key("e"), EndKey: roachpb.Key("f")}, + {Key: roachpb.Key("g"), EndKey: roachpb.Key("h")}, + } txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1} txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2} txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts3} + txn4Meta := enginepb.TxnMeta{ID: txn4, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts4} txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING} - txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED} + txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED, LockSpans: txn2LockSpans} txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED} + // txn4 has its LockSpans populated, simulated a transaction that has been + // rolled back by its coordinator (which populated the LockSpans), but then + // not GC'ed for whatever reason. + txn4Proto := &roachpb.Transaction{TxnMeta: txn4Meta, Status: roachpb.ABORTED, LockSpans: txn4LockSpans} // Run a txnPushAttempt. var tp testTxnPusher tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { - require.Equal(t, 3, len(txns)) + require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) require.Equal(t, txn3Meta, txns[2]) + require.Equal(t, txn4Meta, txns[3]) require.Equal(t, hlc.Timestamp{WallTime: 15}, ts) - // Return all three protos. The PENDING txn is pushed. + // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil }) - tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error { + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + require.Len(t, intents, 4) + require.Equal(t, txn2LockSpans[0], intents[0].Span) + require.Equal(t, txn2LockSpans[1], intents[1].Span) + require.Equal(t, txn4LockSpans[0], intents[2].Span) + require.Equal(t, txn4LockSpans[1], intents[3].Span) + txns := tp.intentsToTxns(intents) require.Equal(t, 2, len(txns)) - require.Equal(t, txn2Proto, txns[0]) - require.Equal(t, txn3Proto, txns[1]) + require.Equal(t, txn2Meta, txns[0]) + // Note that we don't expect intents for txn3 to be resolved since that txn + // doesn't have its LockSpans populated. + require.Equal(t, txn4Meta, txns[1]) return nil }) @@ -313,7 +349,7 @@ func TestTxnPushAttempt(t *testing.T) { p := Processor{eventC: make(chan event, 100)} p.TxnPusher = &tp - txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta} + txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta} doneC := make(chan struct{}) pushAttempt := newTxnPushAttempt(&p, txns, hlc.Timestamp{WallTime: 15}, doneC) pushAttempt.Run(context.Background()) @@ -325,6 +361,7 @@ func TestTxnPushAttempt(t *testing.T) { updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}), updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}), abortTxnOp(txn3), + abortTxnOp(txn4), }}, } require.Equal(t, len(expEvents), len(p.eventC)) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e6298f70720a..d8b4620e79de 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -101,16 +100,11 @@ func (tp *rangefeedTxnPusher) PushTxns( return pushedTxns, nil } -// CleanupTxnIntentsAsync is part of the rangefeed.TxnPusher interface. -func (tp *rangefeedTxnPusher) CleanupTxnIntentsAsync( - ctx context.Context, txns []*roachpb.Transaction, +// ResolveIntents is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, ) error { - endTxns := make([]result.EndTxnIntents, len(txns)) - for i, txn := range txns { - endTxns[i].Txn = txn - endTxns[i].Poison = true - } - return tp.ir.CleanupTxnIntentsAsync(ctx, tp.r.RangeID, endTxns, true /* allowSyncProcessing */) + return tp.ir.ResolveIntents(ctx, intents, intentresolver.ResolveOptions{}).GoError() } type iteratorWithCloser struct {