Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeed: don't delete txn records from the rngfeed pusher #53146

Merged
merged 2 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,16 @@ func (ir *IntentResolver) CleanupIntents(
return resolved, nil
}

// CleanupTxnIntentsAsync asynchronously cleans up intents owned by
// a transaction on completion.
// CleanupTxnIntentsAsync asynchronously cleans up intents owned by a
// transaction on completion. When all intents have been successfully resolved,
// the txn record is GC'ed.
//
// WARNING: Since this GCs the txn record, it should only be called in response
// to requests coming from the coordinator or the GC Queue. We don't want other
// actors to GC a txn record, since that can cause ambiguities for the
// coordinator: if it had STAGED the txn, it won't be able to tell the
// difference between a txn that had been implicitly committed, recovered, and
// GC'ed, and one that someone else aborted and GC'ed.
func (ir *IntentResolver) CleanupTxnIntentsAsync(
ctx context.Context,
rangeID roachpb.RangeID,
Expand Down Expand Up @@ -773,6 +781,10 @@ type ResolveOptions struct {
// ranges trying to read one of its old intents, the access will be trapped
// and the read will return an error, thus avoiding the read missing to see
// its own write.
//
// This field is ignored for intents that aren't resolved for an ABORTED txn;
// in other words, only intents from ABORTED transactions ever poison the
// abort spans.
Poison bool
// The original transaction timestamp from the earliest txn epoch; if
// supplied, resolution of intent ranges can be optimized in some cases.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) {
}
close(e.syncC)
default:
panic("missing event variant")
panic(fmt.Sprintf("missing event variant: %+v", e))
}
}

Expand Down
50 changes: 28 additions & 22 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -675,43 +677,47 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
testNum++
switch testNum {
case 1:
require.Equal(t, 3, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
assert.Equal(t, 3, len(txns))
assert.Equal(t, txn1Meta, txns[0])
assert.Equal(t, txn2Meta, txns[1])
assert.Equal(t, txn3Meta, txns[2])
if t.Failed() {
return nil, errors.New("test failed")
}

// Push does not succeed. Protos not at larger ts.
return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil
case 2:
require.Equal(t, 3, len(txns))
require.Equal(t, txn1MetaT2Pre, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
assert.Equal(t, 3, len(txns))
assert.Equal(t, txn1MetaT2Pre, txns[0])
assert.Equal(t, txn2Meta, txns[1])
assert.Equal(t, txn3Meta, txns[2])
if t.Failed() {
return nil, errors.New("test failed")
}

// Push succeeds. Return new protos.
return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil
case 3:
require.Equal(t, 2, len(txns))
require.Equal(t, txn2MetaT2Post, txns[0])
require.Equal(t, txn3MetaT2Post, txns[1])
assert.Equal(t, 2, len(txns))
assert.Equal(t, txn2MetaT2Post, txns[0])
assert.Equal(t, txn3MetaT2Post, txns[1])
if t.Failed() {
return nil, errors.New("test failed")
}

// Push succeeds. Return new protos.
return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil
default:
return nil, nil
}
})
tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error {
switch testNum {
case 1:
require.Equal(t, 0, len(txns))
case 2:
require.Equal(t, 1, len(txns))
require.Equal(t, txn1ProtoT2, txns[0])
case 3:
require.Equal(t, 1, len(txns))
require.Equal(t, txn2ProtoT3, txns[0])
default:
tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
// There's nothing to assert here. We expect the intents to correspond to
// transactions that had their LockSpans populated when we pushed them. This
// test doesn't simulate that.

if testNum > 3 {
return nil
}

Expand Down
41 changes: 23 additions & 18 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ type TxnPusher interface {
// PushTxns attempts to push the specified transactions to a new
// timestamp. It returns the resulting transaction protos.
PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
// CleanupTxnIntentsAsync asynchronously cleans up intents owned
// by the specified transactions.
CleanupTxnIntentsAsync(context.Context, []*roachpb.Transaction) error
// ResolveIntents resolves the specified intents.
ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error
}

// txnPushAttempt pushes all old transactions that have unresolved intents on
Expand Down Expand Up @@ -173,10 +172,16 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
if err != nil {
return err
}
if len(pushedTxns) != len(a.txns) {
// We expect results for all txns. In particular, if no txns have been pushed, we'd
// crash later cause we'd be creating an invalid empty event.
return errors.AssertionFailedf("tried to push %d transactions, got response for %d",
len(a.txns), len(pushedTxns))
}

// Inform the Processor of the results of the push for each transaction.
ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns))
var toCleanup []*roachpb.Transaction
var intentsToCleanup []roachpb.LockUpdate
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING, roachpb.STAGING:
Expand All @@ -198,13 +203,12 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
Timestamp: txn.WriteTimestamp,
})

// Asynchronously clean up the transaction's intents, which should
// eventually cause all unresolved intents for this transaction on the
// rangefeed's range to be resolved. We'll have to wait until the
// intents are resolved before the resolved timestamp can advance past
// the transaction's commit timestamp, so the best we can do is help
// speed up the resolution.
toCleanup = append(toCleanup, txn)
// Clean up the transaction's intents, which should eventually cause all
// unresolved intents for this transaction on the rangefeed's range to be
// resolved. We'll have to wait until the intents are resolved before the
// resolved timestamp can advance past the transaction's commit timestamp,
// so the best we can do is help speed up the resolution.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
case roachpb.ABORTED:
// The transaction is aborted, so it doesn't need to be tracked
// anymore nor does it need to prevent the resolved timestamp from
Expand All @@ -219,19 +223,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
TxnID: txn.ID,
})

// While we're here, we might as well also clean up the transaction's
// intents so that no-one else needs to deal with them. However, it's
// likely that if our push caused the abort then the transaction's
// intents will be unknown and we won't be doing much good here.
toCleanup = append(toCleanup, txn)
// If the txn happens to have its LockSpans populated, then lets clean up
// the intents as an optimization helping others. If we aborted the txn,
// then it won't have this field populated. If, however, we ran into a
// transaction that its coordinator tried to rollback but didn't follow up
// with garbage collection, then LockSpans will be populated.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
}
}

// Inform the processor of all logical ops.
a.p.sendEvent(event{ops: ops}, 0 /* timeout */)

// Clean up txns, if necessary,
return a.p.TxnPusher.CleanupTxnIntentsAsync(ctx, toCleanup)
// Resolve intents, if necessary.
return a.p.TxnPusher.ResolveIntents(ctx, intentsToCleanup)
}

func (a *txnPushAttempt) Cancel() {
Expand Down
73 changes: 55 additions & 18 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
cleanupTxnIntentsAsyncFn func([]*roachpb.Transaction) error
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error
}

func (tp *testTxnPusher) PushTxns(
Expand All @@ -259,10 +259,8 @@ func (tp *testTxnPusher) PushTxns(
return tp.pushTxnsFn(txns, ts)
}

func (tp *testTxnPusher) CleanupTxnIntentsAsync(
ctx context.Context, txns []*roachpb.Transaction,
) error {
return tp.cleanupTxnIntentsAsyncFn(txns)
func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error {
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) mockPushTxns(
Expand All @@ -271,49 +269,87 @@ func (tp *testTxnPusher) mockPushTxns(
tp.pushTxnsFn = fn
}

func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]*roachpb.Transaction) error) {
tp.cleanupTxnIntentsAsyncFn = fn
func (tp *testTxnPusher) mockResolveIntentsFn(
fn func(context.Context, []roachpb.LockUpdate) error,
) {
tp.resolveIntentsFn = fn
}

func (tp *testTxnPusher) intentsToTxns(intents []roachpb.LockUpdate) []enginepb.TxnMeta {
txns := make([]enginepb.TxnMeta, 0)
txnIDs := make(map[uuid.UUID]struct{})
for _, intent := range intents {
txn := intent.Txn
if _, ok := txnIDs[txn.ID]; ok {
continue
}
txns = append(txns, txn)
txnIDs[txn.ID] = struct{}{}
}
return txns
}

func TestTxnPushAttempt(t *testing.T) {
defer leaktest.AfterTest(t)()

// Create a set of transactions.
txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
ts1, ts2, ts3 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}
txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4}
txn2LockSpans := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
}
txn4LockSpans := []roachpb.Span{
{Key: roachpb.Key("e"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("g"), EndKey: roachpb.Key("h")},
}
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2}
txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts3}
txn4Meta := enginepb.TxnMeta{ID: txn4, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts4}
txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED, LockSpans: txn2LockSpans}
txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED}
// txn4 has its LockSpans populated, simulated a transaction that has been
// rolled back by its coordinator (which populated the LockSpans), but then
// not GC'ed for whatever reason.
txn4Proto := &roachpb.Transaction{TxnMeta: txn4Meta, Status: roachpb.ABORTED, LockSpans: txn4LockSpans}

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
require.Equal(t, 3, len(txns))
require.Equal(t, 4, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
require.Equal(t, txn4Meta, txns[3])
require.Equal(t, hlc.Timestamp{WallTime: 15}, ts)

// Return all three protos. The PENDING txn is pushed.
// Return all four protos. The PENDING txn is pushed.
txn1ProtoPushed := txn1Proto.Clone()
txn1ProtoPushed.WriteTimestamp = ts
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil
})
tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error {
tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
require.Len(t, intents, 4)
require.Equal(t, txn2LockSpans[0], intents[0].Span)
require.Equal(t, txn2LockSpans[1], intents[1].Span)
require.Equal(t, txn4LockSpans[0], intents[2].Span)
require.Equal(t, txn4LockSpans[1], intents[3].Span)
txns := tp.intentsToTxns(intents)
require.Equal(t, 2, len(txns))
require.Equal(t, txn2Proto, txns[0])
require.Equal(t, txn3Proto, txns[1])
require.Equal(t, txn2Meta, txns[0])
// Note that we don't expect intents for txn3 to be resolved since that txn
// doesn't have its LockSpans populated.
require.Equal(t, txn4Meta, txns[1])
return nil
})

// Mock processor. We just needs its eventC.
p := Processor{eventC: make(chan event, 100)}
p.TxnPusher = &tp

txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta}
txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta}
doneC := make(chan struct{})
pushAttempt := newTxnPushAttempt(&p, txns, hlc.Timestamp{WallTime: 15}, doneC)
pushAttempt.Run(context.Background())
Expand All @@ -325,6 +361,7 @@ func TestTxnPushAttempt(t *testing.T) {
updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}),
updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}),
abortTxnOp(txn3),
abortTxnOp(txn4),
}},
}
require.Equal(t, len(expEvents), len(p.eventC))
Expand Down
17 changes: 7 additions & 10 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -101,16 +100,14 @@ func (tp *rangefeedTxnPusher) PushTxns(
return pushedTxns, nil
}

// CleanupTxnIntentsAsync is part of the rangefeed.TxnPusher interface.
func (tp *rangefeedTxnPusher) CleanupTxnIntentsAsync(
ctx context.Context, txns []*roachpb.Transaction,
// ResolveIntents is part of the rangefeed.TxnPusher interface.
func (tp *rangefeedTxnPusher) ResolveIntents(
ctx context.Context, intents []roachpb.LockUpdate,
) error {
endTxns := make([]result.EndTxnIntents, len(txns))
for i, txn := range txns {
endTxns[i].Txn = txn
endTxns[i].Poison = true
}
return tp.ir.CleanupTxnIntentsAsync(ctx, tp.r.RangeID, endTxns, true /* allowSyncProcessing */)
return tp.ir.ResolveIntents(ctx, intents,
// NB: Poison is ignored for non-ABORTED intents.
intentresolver.ResolveOptions{Poison: true},
).GoError()
}

type iteratorWithCloser struct {
Expand Down
Loading