Skip to content

Commit

Permalink
rangefeed: don't delete txn records from the rngfeed pusher
Browse files Browse the repository at this point in the history
Before this patch, the rangefeed txn pushed was GC'ing the records of
the transactions that it found to be aborted or committed when it
PUSH_TOUCHED's them. Deleting the txn record is not a nice thing to do
if the txn's coordinator might still be running, and so this patch
terminates the practice. If the coordinator is still running and it had
STAGED the txn record, a PUSH_TOUCH might cause a transition to ABORTED
or COMMITTED. If such a transition is followed by the GC of the txn
record, this causes an ambigous commit error for the coordinator, which
can't tell whether the transaction committed (implicitly) or was rolled
back.

As the code stands, this patch doesn't actually help eliminate the
ambiguity in the ABORTED case (it does in the COMMIT case though),
because the coordinator doesn't distinguish between an ABORTED record
and a missing record; it treats both the same, as ambiguous. But we're
going to improve that, and the coordinator is gonna consider an existing
record as non-ambiguous. The GC'ed case is going to remain the ambiguous
one, and the idea is to avoid that as much as possible by only doing GC
from the gc-queue (after an hour) and from the coordinator.

Touches #52566

Release note (bug fix): Some rare AmbiguousCommitErrors happening when
CDC was used were eliminated.
  • Loading branch information
andreimatei committed Aug 20, 2020
1 parent 75dbf3d commit 5c62f6a
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 65 deletions.
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,16 @@ func (ir *IntentResolver) CleanupIntents(
return resolved, nil
}

// CleanupTxnIntentsAsync asynchronously cleans up intents owned by
// a transaction on completion.
// CleanupTxnIntentsAsync asynchronously cleans up intents owned by a
// transaction on completion. When all intents have been successfully resolved,
// the txn record is GC'ed.
//
// WARNING: Since this GCs the txn record, it should only be called in response
// to requests coming from the coordinator or the GC Queue. We don't want other
// actors to GC a txn record, since that can cause ambiguities for the
// coordinator: if it had STAGED the txn, it won't be able to tell the
// difference between a txn that had been implicitly committed and one that
// someone else aborted.
func (ir *IntentResolver) CleanupTxnIntentsAsync(
ctx context.Context,
rangeID roachpb.RangeID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) {
}
close(e.syncC)
default:
panic("missing event variant")
panic(fmt.Sprintf("missing event variant: %+v", e))
}
}

Expand Down
50 changes: 28 additions & 22 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -675,43 +677,47 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
testNum++
switch testNum {
case 1:
require.Equal(t, 3, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
assert.Equal(t, 3, len(txns))
assert.Equal(t, txn1Meta, txns[0])
assert.Equal(t, txn2Meta, txns[1])
assert.Equal(t, txn3Meta, txns[2])
if t.Failed() {
return nil, errors.New("test failed")
}

// Push does not succeed. Protos not at larger ts.
return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil
case 2:
require.Equal(t, 3, len(txns))
require.Equal(t, txn1MetaT2Pre, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
assert.Equal(t, 3, len(txns))
assert.Equal(t, txn1MetaT2Pre, txns[0])
assert.Equal(t, txn2Meta, txns[1])
assert.Equal(t, txn3Meta, txns[2])
if t.Failed() {
return nil, errors.New("test failed")
}

// Push succeeds. Return new protos.
return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil
case 3:
require.Equal(t, 2, len(txns))
require.Equal(t, txn2MetaT2Post, txns[0])
require.Equal(t, txn3MetaT2Post, txns[1])
assert.Equal(t, 2, len(txns))
assert.Equal(t, txn2MetaT2Post, txns[0])
assert.Equal(t, txn3MetaT2Post, txns[1])
if t.Failed() {
return nil, errors.New("test failed")
}

// Push succeeds. Return new protos.
return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil
default:
return nil, nil
}
})
tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error {
switch testNum {
case 1:
require.Equal(t, 0, len(txns))
case 2:
require.Equal(t, 1, len(txns))
require.Equal(t, txn1ProtoT2, txns[0])
case 3:
require.Equal(t, 1, len(txns))
require.Equal(t, txn2ProtoT3, txns[0])
default:
tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
// There's nothing to assert here. We expect the intents to correspond to
// transactions that had theit LockSpans populated when we pushed them. This
// test doesn't simulate that.

if testNum > 3 {
return nil
}

Expand Down
30 changes: 18 additions & 12 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ type TxnPusher interface {
// PushTxns attempts to push the specified transactions to a new
// timestamp. It returns the resulting transaction protos.
PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
// CleanupTxnIntentsAsync asynchronously cleans up intents owned
// by the specified transactions.
CleanupTxnIntentsAsync(context.Context, []*roachpb.Transaction) error
// ResolveIntents resolves the specified intents.
ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error
}

// txnPushAttempt pushes all old transactions that have unresolved intents on
Expand Down Expand Up @@ -173,10 +172,16 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
if err != nil {
return err
}
if len(pushedTxns) != len(a.txns) {
// We expect results for all txns. In particular, if no txns have been pushed, we'd
// crash later cause we'd be creating an invalid empty event.
return errors.AssertionFailedf("tried to push %d transactions, got response for %d",
len(a.txns), len(pushedTxns))
}

// Inform the Processor of the results of the push for each transaction.
ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns))
var toCleanup []*roachpb.Transaction
var intentsToCleanup []roachpb.LockUpdate
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING, roachpb.STAGING:
Expand Down Expand Up @@ -204,7 +209,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
// intents are resolved before the resolved timestamp can advance past
// the transaction's commit timestamp, so the best we can do is help
// speed up the resolution.
toCleanup = append(toCleanup, txn)
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
case roachpb.ABORTED:
// The transaction is aborted, so it doesn't need to be tracked
// anymore nor does it need to prevent the resolved timestamp from
Expand All @@ -219,19 +224,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
TxnID: txn.ID,
})

// While we're here, we might as well also clean up the transaction's
// intents so that no-one else needs to deal with them. However, it's
// likely that if our push caused the abort then the transaction's
// intents will be unknown and we won't be doing much good here.
toCleanup = append(toCleanup, txn)
// If the txn happens to have its LockSpans populated, then lets clean up
// the intents. If we aborted the txn, then it won't have this field
// populated. If, however, we ran into a transaction that its coordinator
// tried to rollback but didn't follow up with garbage collection, then
// LockSpans will be populated.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
}
}

// Inform the processor of all logical ops.
a.p.sendEvent(event{ops: ops}, 0 /* timeout */)

// Clean up txns, if necessary,
return a.p.TxnPusher.CleanupTxnIntentsAsync(ctx, toCleanup)
// Resolve intents, if necessary.
return a.p.TxnPusher.ResolveIntents(ctx, intentsToCleanup)
}

func (a *txnPushAttempt) Cancel() {
Expand Down
73 changes: 55 additions & 18 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
cleanupTxnIntentsAsyncFn func([]*roachpb.Transaction) error
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error
}

func (tp *testTxnPusher) PushTxns(
Expand All @@ -259,10 +259,8 @@ func (tp *testTxnPusher) PushTxns(
return tp.pushTxnsFn(txns, ts)
}

func (tp *testTxnPusher) CleanupTxnIntentsAsync(
ctx context.Context, txns []*roachpb.Transaction,
) error {
return tp.cleanupTxnIntentsAsyncFn(txns)
func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error {
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) mockPushTxns(
Expand All @@ -271,49 +269,87 @@ func (tp *testTxnPusher) mockPushTxns(
tp.pushTxnsFn = fn
}

func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]*roachpb.Transaction) error) {
tp.cleanupTxnIntentsAsyncFn = fn
func (tp *testTxnPusher) intentsToTxns(intents []roachpb.LockUpdate) []enginepb.TxnMeta {
txns := make([]enginepb.TxnMeta, 0)
txnIDs := make(map[uuid.UUID]struct{})
for _, intent := range intents {
txn := intent.Txn
if _, ok := txnIDs[txn.ID]; ok {
continue
}
txns = append(txns, txn)
txnIDs[txn.ID] = struct{}{}
}
return txns
}

func (tp *testTxnPusher) mockResolveIntentsFn(
fn func(context.Context, []roachpb.LockUpdate) error,
) {
tp.resolveIntentsFn = fn
}

func TestTxnPushAttempt(t *testing.T) {
defer leaktest.AfterTest(t)()

// Create a set of transactions.
txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
ts1, ts2, ts3 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}
txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4}
txn2LockSpans := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
}
txn4LockSpans := []roachpb.Span{
{Key: roachpb.Key("e"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("g"), EndKey: roachpb.Key("h")},
}
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2}
txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts3}
txn4Meta := enginepb.TxnMeta{ID: txn4, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts4}
txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED, LockSpans: txn2LockSpans}
txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED}
// txn4 has its LockSpans populated, simulated a transaction that has been
// rolled back by its coordinator (which populated the LockSpans), but then
// not GC'ed for whatever reason.
txn4Proto := &roachpb.Transaction{TxnMeta: txn4Meta, Status: roachpb.ABORTED, LockSpans: txn4LockSpans}

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
require.Equal(t, 3, len(txns))
require.Equal(t, 4, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
require.Equal(t, txn4Meta, txns[3])
require.Equal(t, hlc.Timestamp{WallTime: 15}, ts)

// Return all three protos. The PENDING txn is pushed.
// Return all four protos. The PENDING txn is pushed.
txn1ProtoPushed := txn1Proto.Clone()
txn1ProtoPushed.WriteTimestamp = ts
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil
})
tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error {
tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
require.Len(t, intents, 4)
require.Equal(t, txn2LockSpans[0], intents[0].Span)
require.Equal(t, txn2LockSpans[1], intents[1].Span)
require.Equal(t, txn4LockSpans[0], intents[2].Span)
require.Equal(t, txn4LockSpans[1], intents[3].Span)
txns := tp.intentsToTxns(intents)
require.Equal(t, 2, len(txns))
require.Equal(t, txn2Proto, txns[0])
require.Equal(t, txn3Proto, txns[1])
require.Equal(t, txn2Meta, txns[0])
// Note that we don't expect intents for txn3 to be resolved since that txn
// doesn't have its LockSpans populated.
require.Equal(t, txn4Meta, txns[1])
return nil
})

// Mock processor. We just needs its eventC.
p := Processor{eventC: make(chan event, 100)}
p.TxnPusher = &tp

txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta}
txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta}
doneC := make(chan struct{})
pushAttempt := newTxnPushAttempt(&p, txns, hlc.Timestamp{WallTime: 15}, doneC)
pushAttempt.Run(context.Background())
Expand All @@ -325,6 +361,7 @@ func TestTxnPushAttempt(t *testing.T) {
updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}),
updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}),
abortTxnOp(txn3),
abortTxnOp(txn4),
}},
}
require.Equal(t, len(expEvents), len(p.eventC))
Expand Down
14 changes: 4 additions & 10 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -101,16 +100,11 @@ func (tp *rangefeedTxnPusher) PushTxns(
return pushedTxns, nil
}

// CleanupTxnIntentsAsync is part of the rangefeed.TxnPusher interface.
func (tp *rangefeedTxnPusher) CleanupTxnIntentsAsync(
ctx context.Context, txns []*roachpb.Transaction,
// ResolveIntents is part of the rangefeed.TxnPusher interface.
func (tp *rangefeedTxnPusher) ResolveIntents(
ctx context.Context, intents []roachpb.LockUpdate,
) error {
endTxns := make([]result.EndTxnIntents, len(txns))
for i, txn := range txns {
endTxns[i].Txn = txn
endTxns[i].Poison = true
}
return tp.ir.CleanupTxnIntentsAsync(ctx, tp.r.RangeID, endTxns, true /* allowSyncProcessing */)
return tp.ir.ResolveIntents(ctx, intents, intentresolver.ResolveOptions{}).GoError()
}

type iteratorWithCloser struct {
Expand Down

0 comments on commit 5c62f6a

Please sign in to comment.