Skip to content

Commit

Permalink
kv: resolve only intents in rangefeed's own range from txnPushAttempt
Browse files Browse the repository at this point in the history
Fixes cockroachdb#66741.

Before this change, `(*txnPushAttempt).pushOldTxns` would resolve all of
a committed or aborted transaction's intents that it discovered during a
push, instead of just those in the current range.

This was unnecessary for the purposes of the rangefeed, which only needs
a pushed transaction's intents in its range resolved. Worse, it could
result in quadratic behavior in cases where a large transaction wrote
many intents across many ranges that each had an active rangefeed
processor. In such cases, the rangefeed processor on each of the ranges
that the transaction touched would try to resolve its intents across all
ranges that the transaction touched. This could lead to a herd of
redundant intent resolution, which was especially disruptive if the
transaction had exceeded its precise intent span tracking and degraded
to ranged intent resolution.

This commit fixes this issue by having each rangefeed processor resolve
only those intents that are within the bounds of its own range. This
avoids the quadratic behavior that, in its worst form, could create a
pileup of ranged intent resolution across an entire table and starve out
foreground traffic.

Release note (bug fix): Changefeeds no longer interact poorly with
large, abandoned transactions. It was previously possible for this
combination to result in a cascade of work during transaction cleanup
that could starve out foreground traffic.
  • Loading branch information
nvanbenschoten committed Jun 24, 2021
1 parent 53c1ebe commit ea77990
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 19 deletions.
62 changes: 50 additions & 12 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
Timestamp: txn.WriteTimestamp,
})

// Clean up the transaction's intents, which should eventually cause all
// unresolved intents for this transaction on the rangefeed's range to be
// resolved. We'll have to wait until the intents are resolved before the
// resolved timestamp can advance past the transaction's commit timestamp,
// so the best we can do is help speed up the resolution.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
// Clean up the transaction's intents within the processor's range, which
// should eventually cause all unresolved intents for this transaction on
// the rangefeed's range to be resolved. We'll have to wait until the
// intents are resolved before the resolved timestamp can advance past the
// transaction's commit timestamp, so the best we can do is help speed up
// the resolution.
txnIntents := intentsInBound(txn, a.p.Span.AsRawSpanWithNoLocals())
intentsToCleanup = append(intentsToCleanup, txnIntents...)
case roachpb.ABORTED:
// The transaction is aborted, so it doesn't need to be tracked
// anymore nor does it need to prevent the resolved timestamp from
Expand All @@ -223,12 +225,19 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
TxnID: txn.ID,
})

// If the txn happens to have its LockSpans populated, then lets clean up
// the intents as an optimization helping others. If we aborted the txn,
// then it won't have this field populated. If, however, we ran into a
// transaction that its coordinator tried to rollback but didn't follow up
// with garbage collection, then LockSpans will be populated.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
// We just informed the Processor about this txn being aborted, so from
// its perspective, there's nothing more to do — the txn's intents are no
// longer holding up the resolved timestamp.
//
// However, if the txn happens to have its LockSpans populated, then lets
// clean up the intents within the processor's range as an optimization to
// help others and to prevent any rangefeed reconnections from needing to
// push the same txn. If we aborted the txn, then it won't have its
// LockSpans populated. If, however, we ran into a transaction that its
// coordinator tried to rollback but didn't follow up with garbage
// collection, then LockSpans will be populated.
txnIntents := intentsInBound(txn, a.p.Span.AsRawSpanWithNoLocals())
intentsToCleanup = append(intentsToCleanup, txnIntents...)
}
}

Expand All @@ -242,3 +251,32 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
func (a *txnPushAttempt) Cancel() {
close(a.doneC)
}

// intentsInBound returns LockUpdates for the provided transaction's LockSpans
// that intersect with the rangefeed Processor's range boundaries. For ranged
// LockSpans, a LockUpdate containing only the portion that overlaps with the
// range boundary will be returned.
//
// We filter a transaction's LockSpans to ensure that each rangefeed processor
// resolves only those intents that are within the bounds of its own range. This
// avoids unnecessary work, because a rangefeed processor only needs the intents
// in its own range to be resolved in order to advance its resolved timestamp.
// Additionally, it also avoids quadratic behavior if many rangefeed processors
// notice intents from the same transaction across many ranges. In its worst
// form, without filtering, this could create a pileup of ranged intent
// resolution across an entire table and starve out foreground traffic.
//
// NOTE: a rangefeed Processor is only configured to watch the global keyspace
// for a range. It is also only informed about logical operations on global keys
// (see OpLoggerBatch.logLogicalOp). So even if this transaction has LockSpans
// in the range's global and local keyspace, we only need to resolve those in
// the global keyspace.
func intentsInBound(txn *roachpb.Transaction, bound roachpb.Span) []roachpb.LockUpdate {
var ret []roachpb.LockUpdate
for _, sp := range txn.LockSpans {
if in := sp.Intersect(bound); in.Valid() {
ret = append(ret, roachpb.MakeLockUpdate(txn, in))
}
}
return ret
}
34 changes: 28 additions & 6 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,16 @@ func TestTxnPushAttempt(t *testing.T) {
txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4}
txn2LockSpans := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")},
{Key: roachpb.Key("y"), EndKey: roachpb.Key("z")}, // ignored
}
txn4LockSpans := []roachpb.Span{
{Key: roachpb.Key("e"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("g"), EndKey: roachpb.Key("h")},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("g")},
{Key: roachpb.Key("h"), EndKey: roachpb.Key("i")},
{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, // truncated at beginning
{Key: roachpb.Key("j"), EndKey: roachpb.Key("q")}, // truncated at end
{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, // truncated at beginning and end
}
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2}
Expand Down Expand Up @@ -331,11 +335,27 @@ func TestTxnPushAttempt(t *testing.T) {
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil
})
tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
require.Len(t, intents, 4)
require.Len(t, intents, 7)
require.Equal(t, txn2LockSpans[0], intents[0].Span)
require.Equal(t, txn2LockSpans[1], intents[1].Span)
require.Equal(t, txn4LockSpans[0], intents[2].Span)
require.Equal(t, txn4LockSpans[1], intents[3].Span)
require.Equal(t, func() roachpb.Span {
s := txn4LockSpans[2] // truncated at beginning
s.Key = roachpb.Key("b")
return s
}(), intents[4].Span)
require.Equal(t, func() roachpb.Span {
s := txn4LockSpans[3] // truncated at end
s.EndKey = roachpb.Key("m")
return s
}(), intents[5].Span)
require.Equal(t, func() roachpb.Span {
s := txn4LockSpans[4] // truncated at beginning and end
s.Key = roachpb.Key("b")
s.EndKey = roachpb.Key("m")
return s
}(), intents[6].Span)
txns := tp.intentsToTxns(intents)
require.Equal(t, 2, len(txns))
require.Equal(t, txn2Meta, txns[0])
Expand All @@ -345,8 +365,10 @@ func TestTxnPushAttempt(t *testing.T) {
return nil
})

// Mock processor. We just needs its eventC.
// Mock processor. We configure its key span to exclude one of txn2's lock
// spans and a portion of three of txn4's lock spans.
p := Processor{eventC: make(chan *event, 100)}
p.Span = roachpb.RSpan{Key: roachpb.RKey("b"), EndKey: roachpb.RKey("m")}
p.TxnPusher = &tp

txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
// it will shut down with an error. If the WriteBatch is nil then we expect
// the logical operation log to also be nil. We don't want to trigger a
// shutdown of the rangefeed in that situation, so we don't pass anything to
// the rangefed. If no rangefeed is running at all, this call will be a noop.
// the rangefeed. If no rangefeed is running at all, this call will be a noop.
if ops := cmd.raftCmd.LogicalOpLog; cmd.raftCmd.WriteBatch != nil {
b.r.handleLogicalOpLogRaftMuLocked(ctx, ops, b.batch)
} else if ops != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,37 @@ func (s Span) Overlaps(o Span) bool {
return bytes.Compare(s.EndKey, o.Key) > 0 && bytes.Compare(s.Key, o.EndKey) < 0
}

// Intersect returns the intersection of the key space covered by the two spans.
// If there is no intersection between the two spans, an invalid span (see Valid)
// is returned.
func (s Span) Intersect(o Span) Span {
// If two spans do not overlap, there is no intersection between them.
if !s.Overlaps(o) {
return Span{}
}

// An empty end key means this span contains a single key. Overlaps already
// has special code for the single-key cases, so here we return whichever key
// is the single key, if any. If they are both a single key, we know they are
// equal anyway so the order doesn't matter.
if len(s.EndKey) == 0 {
return s
}
if len(o.EndKey) == 0 {
return o
}

key := s.Key
if key.Compare(o.Key) < 0 {
key = o.Key
}
endKey := s.EndKey
if endKey.Compare(o.EndKey) > 0 {
endKey = o.EndKey
}
return Span{key, endKey}
}

// Combine creates a new span containing the full union of the key
// space covered by the two spans. This includes any key space not
// covered by either span, but between them if the spans are disjoint.
Expand Down
53 changes: 53 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,59 @@ func TestSpanOverlaps(t *testing.T) {
}
}

func TestSpanIntersect(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
sAtoC := Span{Key: []byte("a"), EndKey: []byte("c")}
sAtoD := Span{Key: []byte("a"), EndKey: []byte("d")}
sBtoC := Span{Key: []byte("b"), EndKey: []byte("c")}
sBtoD := Span{Key: []byte("b"), EndKey: []byte("d")}
sCtoD := Span{Key: []byte("c"), EndKey: []byte("d")}
// Invalid spans.
sCtoA := Span{Key: []byte("c"), EndKey: []byte("a")}
sDtoB := Span{Key: []byte("d"), EndKey: []byte("b")}

testData := []struct {
s1, s2 Span
expect Span
}{
{sA, sA, sA},
{sA, sAtoC, sA},
{sAtoC, sA, sA},
{sAtoC, sAtoC, sAtoC},
{sAtoC, sAtoD, sAtoC},
{sAtoD, sAtoC, sAtoC},
{sAtoC, sBtoC, sBtoC},
{sBtoC, sAtoC, sBtoC},
{sAtoC, sBtoD, sBtoC},
{sBtoD, sAtoC, sBtoC},
{sAtoD, sBtoC, sBtoC},
{sBtoC, sAtoD, sBtoC},
// Empty intersections.
{sA, sD, Span{}},
{sA, sBtoD, Span{}},
{sBtoD, sA, Span{}},
{sD, sBtoD, Span{}},
{sBtoD, sD, Span{}},
{sAtoC, sCtoD, Span{}},
{sCtoD, sAtoC, Span{}},
// Invalid spans.
{sAtoC, sDtoB, Span{}},
{sDtoB, sAtoC, Span{}},
{sBtoD, sCtoA, Span{}},
{sCtoA, sBtoD, Span{}},
}
for _, test := range testData {
in := test.s1.Intersect(test.s2)
if test.expect.Valid() {
require.True(t, in.Valid())
require.Equal(t, test.expect, in)
} else {
require.False(t, in.Valid())
}
}
}

func TestSpanCombine(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
Expand Down

0 comments on commit ea77990

Please sign in to comment.