From bc2f9742b8cd10c9a4c195634be7877b0b78eb61 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 20 Aug 2020 11:04:06 -0400 Subject: [PATCH] rangefeed: don't delete txn records from the rngfeed pusher Before this patch, the rangefeed txn pushed was GC'ing the records of the transactions that it found to be aborted or committed when it PUSH_TOUCHED's them. Deleting the txn record is not a nice thing to do if the txn's coordinator might still be running, and so this patch terminates the practice. If the coordinator is still running and it had STAGED the txn record, a PUSH_TOUCH might cause a transition to ABORTED or COMMITTED. If such a transition is followed by the GC of the txn record, this causes an ambigous commit error for the coordinator, which can't tell whether the transaction committed (implicitly) or was rolled back. As the code stands, this patch doesn't actually help eliminate the ambiguity in the ABORTED case (it does in the COMMIT case though), because the coordinator doesn't distinguish between an ABORTED record and a missing record; it treats both the same, as ambiguous. But we're going to improve that, and the coordinator is gonna consider an existing record as non-ambiguous. The GC'ed case is going to remain the ambiguous one, and the idea is to avoid that as much as possible by only doing GC from the gc-queue (after an hour) and from the coordinator. Touches #52566 Release note (bug fix): Some rare AmbiguousCommitErrors happening when CDC was used were eliminated. --- .../intentresolver/intent_resolver.go | 12 ++- pkg/kv/kvserver/rangefeed/processor.go | 2 +- pkg/kv/kvserver/rangefeed/processor_test.go | 50 +++++++------ pkg/kv/kvserver/rangefeed/task.go | 41 ++++++----- pkg/kv/kvserver/rangefeed/task_test.go | 73 ++++++++++++++----- pkg/kv/kvserver/replica_rangefeed.go | 17 ++--- 6 files changed, 124 insertions(+), 71 deletions(-) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index a3c19c2a7af8..98d64f3cbae6 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -538,8 +538,16 @@ func (ir *IntentResolver) CleanupIntents( return resolved, nil } -// CleanupTxnIntentsAsync asynchronously cleans up intents owned by -// a transaction on completion. +// CleanupTxnIntentsAsync asynchronously cleans up intents owned by a +// transaction on completion. When all intents have been successfully resolved, +// the txn record is GC'ed. +// +// WARNING: Since this GCs the txn record, it should only be called in response +// to requests coming from the coordinator or the GC Queue. We don't want other +// actors to GC a txn record, since that can cause ambiguities for the +// coordinator: if it had STAGED the txn, it won't be able to tell the +// difference between a txn that had been implicitly committed, recovered, and +// GC'ed, and one that someone else aborted and GC'ed. func (ir *IntentResolver) CleanupTxnIntentsAsync( ctx context.Context, rangeID roachpb.RangeID, diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 3079743a5267..58c5632d089a 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -532,7 +532,7 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) { } close(e.syncC) default: - panic("missing event variant") + panic(fmt.Sprintf("missing event variant: %+v", e)) } } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index e4bf0062547b..18b78c780f76 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -675,25 +677,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { testNum++ switch testNum { case 1: - require.Equal(t, 3, len(txns)) - require.Equal(t, txn1Meta, txns[0]) - require.Equal(t, txn2Meta, txns[1]) - require.Equal(t, txn3Meta, txns[2]) + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1Meta, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push does not succeed. Protos not at larger ts. return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil case 2: - require.Equal(t, 3, len(txns)) - require.Equal(t, txn1MetaT2Pre, txns[0]) - require.Equal(t, txn2Meta, txns[1]) - require.Equal(t, txn3Meta, txns[2]) + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1MetaT2Pre, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push succeeds. Return new protos. return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil case 3: - require.Equal(t, 2, len(txns)) - require.Equal(t, txn2MetaT2Post, txns[0]) - require.Equal(t, txn3MetaT2Post, txns[1]) + assert.Equal(t, 2, len(txns)) + assert.Equal(t, txn2MetaT2Post, txns[0]) + assert.Equal(t, txn3MetaT2Post, txns[1]) + if t.Failed() { + return nil, errors.New("test failed") + } // Push succeeds. Return new protos. return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil @@ -701,17 +712,12 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil, nil } }) - tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error { - switch testNum { - case 1: - require.Equal(t, 0, len(txns)) - case 2: - require.Equal(t, 1, len(txns)) - require.Equal(t, txn1ProtoT2, txns[0]) - case 3: - require.Equal(t, 1, len(txns)) - require.Equal(t, txn2ProtoT3, txns[0]) - default: + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + // There's nothing to assert here. We expect the intents to correspond to + // transactions that had their LockSpans populated when we pushed them. This + // test doesn't simulate that. + + if testNum > 3 { return nil } diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 7630089dbada..7cca67eb6a9f 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -117,9 +117,8 @@ type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new // timestamp. It returns the resulting transaction protos. PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) - // CleanupTxnIntentsAsync asynchronously cleans up intents owned - // by the specified transactions. - CleanupTxnIntentsAsync(context.Context, []*roachpb.Transaction) error + // ResolveIntents resolves the specified intents. + ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -173,10 +172,16 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { if err != nil { return err } + if len(pushedTxns) != len(a.txns) { + // We expect results for all txns. In particular, if no txns have been pushed, we'd + // crash later cause we'd be creating an invalid empty event. + return errors.AssertionFailedf("tried to push %d transactions, got response for %d", + len(a.txns), len(pushedTxns)) + } // Inform the Processor of the results of the push for each transaction. ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns)) - var toCleanup []*roachpb.Transaction + var intentsToCleanup []roachpb.LockUpdate for i, txn := range pushedTxns { switch txn.Status { case roachpb.PENDING, roachpb.STAGING: @@ -198,13 +203,12 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { Timestamp: txn.WriteTimestamp, }) - // Asynchronously clean up the transaction's intents, which should - // eventually cause all unresolved intents for this transaction on the - // rangefeed's range to be resolved. We'll have to wait until the - // intents are resolved before the resolved timestamp can advance past - // the transaction's commit timestamp, so the best we can do is help - // speed up the resolution. - toCleanup = append(toCleanup, txn) + // Clean up the transaction's intents, which should eventually cause all + // unresolved intents for this transaction on the rangefeed's range to be + // resolved. We'll have to wait until the intents are resolved before the + // resolved timestamp can advance past the transaction's commit timestamp, + // so the best we can do is help speed up the resolution. + intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...) case roachpb.ABORTED: // The transaction is aborted, so it doesn't need to be tracked // anymore nor does it need to prevent the resolved timestamp from @@ -219,19 +223,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { TxnID: txn.ID, }) - // While we're here, we might as well also clean up the transaction's - // intents so that no-one else needs to deal with them. However, it's - // likely that if our push caused the abort then the transaction's - // intents will be unknown and we won't be doing much good here. - toCleanup = append(toCleanup, txn) + // If the txn happens to have its LockSpans populated, then lets clean up + // the intents as an optimization helping others. If we aborted the txn, + // then it won't have this field populated. If, however, we ran into a + // transaction that its coordinator tried to rollback but didn't follow up + // with garbage collection, then LockSpans will be populated. + intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...) } } // Inform the processor of all logical ops. a.p.sendEvent(event{ops: ops}, 0 /* timeout */) - // Clean up txns, if necessary, - return a.p.TxnPusher.CleanupTxnIntentsAsync(ctx, toCleanup) + // Resolve intents, if necessary. + return a.p.TxnPusher.ResolveIntents(ctx, intentsToCleanup) } func (a *txnPushAttempt) Cancel() { diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 45fcd551edac..78ffd57d2823 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -249,8 +249,8 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) - cleanupTxnIntentsAsyncFn func([]*roachpb.Transaction) error + pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( @@ -259,10 +259,8 @@ func (tp *testTxnPusher) PushTxns( return tp.pushTxnsFn(txns, ts) } -func (tp *testTxnPusher) CleanupTxnIntentsAsync( - ctx context.Context, txns []*roachpb.Transaction, -) error { - return tp.cleanupTxnIntentsAsyncFn(txns) +func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { + return tp.resolveIntentsFn(ctx, intents) } func (tp *testTxnPusher) mockPushTxns( @@ -271,41 +269,79 @@ func (tp *testTxnPusher) mockPushTxns( tp.pushTxnsFn = fn } -func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]*roachpb.Transaction) error) { - tp.cleanupTxnIntentsAsyncFn = fn +func (tp *testTxnPusher) mockResolveIntentsFn( + fn func(context.Context, []roachpb.LockUpdate) error, +) { + tp.resolveIntentsFn = fn +} + +func (tp *testTxnPusher) intentsToTxns(intents []roachpb.LockUpdate) []enginepb.TxnMeta { + txns := make([]enginepb.TxnMeta, 0) + txnIDs := make(map[uuid.UUID]struct{}) + for _, intent := range intents { + txn := intent.Txn + if _, ok := txnIDs[txn.ID]; ok { + continue + } + txns = append(txns, txn) + txnIDs[txn.ID] = struct{}{} + } + return txns } func TestTxnPushAttempt(t *testing.T) { defer leaktest.AfterTest(t)() // Create a set of transactions. - txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() - ts1, ts2, ts3 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3} + txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() + ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4} + txn2LockSpans := []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + {Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + } + txn4LockSpans := []roachpb.Span{ + {Key: roachpb.Key("e"), EndKey: roachpb.Key("f")}, + {Key: roachpb.Key("g"), EndKey: roachpb.Key("h")}, + } txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1} txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2} txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts3} + txn4Meta := enginepb.TxnMeta{ID: txn4, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts4} txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING} - txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED} + txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED, LockSpans: txn2LockSpans} txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED} + // txn4 has its LockSpans populated, simulated a transaction that has been + // rolled back by its coordinator (which populated the LockSpans), but then + // not GC'ed for whatever reason. + txn4Proto := &roachpb.Transaction{TxnMeta: txn4Meta, Status: roachpb.ABORTED, LockSpans: txn4LockSpans} // Run a txnPushAttempt. var tp testTxnPusher tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { - require.Equal(t, 3, len(txns)) + require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) require.Equal(t, txn3Meta, txns[2]) + require.Equal(t, txn4Meta, txns[3]) require.Equal(t, hlc.Timestamp{WallTime: 15}, ts) - // Return all three protos. The PENDING txn is pushed. + // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil }) - tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error { + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + require.Len(t, intents, 4) + require.Equal(t, txn2LockSpans[0], intents[0].Span) + require.Equal(t, txn2LockSpans[1], intents[1].Span) + require.Equal(t, txn4LockSpans[0], intents[2].Span) + require.Equal(t, txn4LockSpans[1], intents[3].Span) + txns := tp.intentsToTxns(intents) require.Equal(t, 2, len(txns)) - require.Equal(t, txn2Proto, txns[0]) - require.Equal(t, txn3Proto, txns[1]) + require.Equal(t, txn2Meta, txns[0]) + // Note that we don't expect intents for txn3 to be resolved since that txn + // doesn't have its LockSpans populated. + require.Equal(t, txn4Meta, txns[1]) return nil }) @@ -313,7 +349,7 @@ func TestTxnPushAttempt(t *testing.T) { p := Processor{eventC: make(chan event, 100)} p.TxnPusher = &tp - txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta} + txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta} doneC := make(chan struct{}) pushAttempt := newTxnPushAttempt(&p, txns, hlc.Timestamp{WallTime: 15}, doneC) pushAttempt.Run(context.Background()) @@ -325,6 +361,7 @@ func TestTxnPushAttempt(t *testing.T) { updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}), updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}), abortTxnOp(txn3), + abortTxnOp(txn4), }}, } require.Equal(t, len(expEvents), len(p.eventC)) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e6298f70720a..1d85069a62b9 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -101,16 +100,14 @@ func (tp *rangefeedTxnPusher) PushTxns( return pushedTxns, nil } -// CleanupTxnIntentsAsync is part of the rangefeed.TxnPusher interface. -func (tp *rangefeedTxnPusher) CleanupTxnIntentsAsync( - ctx context.Context, txns []*roachpb.Transaction, +// ResolveIntents is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, ) error { - endTxns := make([]result.EndTxnIntents, len(txns)) - for i, txn := range txns { - endTxns[i].Txn = txn - endTxns[i].Poison = true - } - return tp.ir.CleanupTxnIntentsAsync(ctx, tp.r.RangeID, endTxns, true /* allowSyncProcessing */) + return tp.ir.ResolveIntents(ctx, intents, + // NB: Poison is ignored for non-ABORTED intents. + intentresolver.ResolveOptions{Poison: true}, + ).GoError() } type iteratorWithCloser struct {