Skip to content

Commit

Permalink
Merge pull request cockroachdb#66814 from nvanbenschoten/backport20.2…
Browse files Browse the repository at this point in the history
…-66746

release-20.2: kv: resolve only intents in rangefeed's own range from txnPushAttempt
  • Loading branch information
nvanbenschoten authored Jun 29, 2021
2 parents 8086ff5 + ea77990 commit 5e53452
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 19 deletions.
62 changes: 50 additions & 12 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
Timestamp: txn.WriteTimestamp,
})

// Clean up the transaction's intents, which should eventually cause all
// unresolved intents for this transaction on the rangefeed's range to be
// resolved. We'll have to wait until the intents are resolved before the
// resolved timestamp can advance past the transaction's commit timestamp,
// so the best we can do is help speed up the resolution.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
// Clean up the transaction's intents within the processor's range, which
// should eventually cause all unresolved intents for this transaction on
// the rangefeed's range to be resolved. We'll have to wait until the
// intents are resolved before the resolved timestamp can advance past the
// transaction's commit timestamp, so the best we can do is help speed up
// the resolution.
txnIntents := intentsInBound(txn, a.p.Span.AsRawSpanWithNoLocals())
intentsToCleanup = append(intentsToCleanup, txnIntents...)
case roachpb.ABORTED:
// The transaction is aborted, so it doesn't need to be tracked
// anymore nor does it need to prevent the resolved timestamp from
Expand All @@ -223,12 +225,19 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
TxnID: txn.ID,
})

// If the txn happens to have its LockSpans populated, then lets clean up
// the intents as an optimization helping others. If we aborted the txn,
// then it won't have this field populated. If, however, we ran into a
// transaction that its coordinator tried to rollback but didn't follow up
// with garbage collection, then LockSpans will be populated.
intentsToCleanup = append(intentsToCleanup, txn.LocksAsLockUpdates()...)
// We just informed the Processor about this txn being aborted, so from
// its perspective, there's nothing more to do — the txn's intents are no
// longer holding up the resolved timestamp.
//
// However, if the txn happens to have its LockSpans populated, then lets
// clean up the intents within the processor's range as an optimization to
// help others and to prevent any rangefeed reconnections from needing to
// push the same txn. If we aborted the txn, then it won't have its
// LockSpans populated. If, however, we ran into a transaction that its
// coordinator tried to rollback but didn't follow up with garbage
// collection, then LockSpans will be populated.
txnIntents := intentsInBound(txn, a.p.Span.AsRawSpanWithNoLocals())
intentsToCleanup = append(intentsToCleanup, txnIntents...)
}
}

Expand All @@ -242,3 +251,32 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
func (a *txnPushAttempt) Cancel() {
close(a.doneC)
}

// intentsInBound returns LockUpdates for the provided transaction's LockSpans
// that intersect with the rangefeed Processor's range boundaries. For ranged
// LockSpans, a LockUpdate containing only the portion that overlaps with the
// range boundary will be returned.
//
// We filter a transaction's LockSpans to ensure that each rangefeed processor
// resolves only those intents that are within the bounds of its own range. This
// avoids unnecessary work, because a rangefeed processor only needs the intents
// in its own range to be resolved in order to advance its resolved timestamp.
// Additionally, it also avoids quadratic behavior if many rangefeed processors
// notice intents from the same transaction across many ranges. In its worst
// form, without filtering, this could create a pileup of ranged intent
// resolution across an entire table and starve out foreground traffic.
//
// NOTE: a rangefeed Processor is only configured to watch the global keyspace
// for a range. It is also only informed about logical operations on global keys
// (see OpLoggerBatch.logLogicalOp). So even if this transaction has LockSpans
// in the range's global and local keyspace, we only need to resolve those in
// the global keyspace.
func intentsInBound(txn *roachpb.Transaction, bound roachpb.Span) []roachpb.LockUpdate {
var ret []roachpb.LockUpdate
for _, sp := range txn.LockSpans {
if in := sp.Intersect(bound); in.Valid() {
ret = append(ret, roachpb.MakeLockUpdate(txn, in))
}
}
return ret
}
34 changes: 28 additions & 6 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,16 @@ func TestTxnPushAttempt(t *testing.T) {
txn1, txn2, txn3, txn4 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
ts1, ts2, ts3, ts4 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}, hlc.Timestamp{WallTime: 4}
txn2LockSpans := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("d"), EndKey: roachpb.Key("e")},
{Key: roachpb.Key("y"), EndKey: roachpb.Key("z")}, // ignored
}
txn4LockSpans := []roachpb.Span{
{Key: roachpb.Key("e"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("g"), EndKey: roachpb.Key("h")},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("g")},
{Key: roachpb.Key("h"), EndKey: roachpb.Key("i")},
{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, // truncated at beginning
{Key: roachpb.Key("j"), EndKey: roachpb.Key("q")}, // truncated at end
{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, // truncated at beginning and end
}
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2}
Expand Down Expand Up @@ -331,11 +335,27 @@ func TestTxnPushAttempt(t *testing.T) {
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil
})
tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
require.Len(t, intents, 4)
require.Len(t, intents, 7)
require.Equal(t, txn2LockSpans[0], intents[0].Span)
require.Equal(t, txn2LockSpans[1], intents[1].Span)
require.Equal(t, txn4LockSpans[0], intents[2].Span)
require.Equal(t, txn4LockSpans[1], intents[3].Span)
require.Equal(t, func() roachpb.Span {
s := txn4LockSpans[2] // truncated at beginning
s.Key = roachpb.Key("b")
return s
}(), intents[4].Span)
require.Equal(t, func() roachpb.Span {
s := txn4LockSpans[3] // truncated at end
s.EndKey = roachpb.Key("m")
return s
}(), intents[5].Span)
require.Equal(t, func() roachpb.Span {
s := txn4LockSpans[4] // truncated at beginning and end
s.Key = roachpb.Key("b")
s.EndKey = roachpb.Key("m")
return s
}(), intents[6].Span)
txns := tp.intentsToTxns(intents)
require.Equal(t, 2, len(txns))
require.Equal(t, txn2Meta, txns[0])
Expand All @@ -345,8 +365,10 @@ func TestTxnPushAttempt(t *testing.T) {
return nil
})

// Mock processor. We just needs its eventC.
// Mock processor. We configure its key span to exclude one of txn2's lock
// spans and a portion of three of txn4's lock spans.
p := Processor{eventC: make(chan *event, 100)}
p.Span = roachpb.RSpan{Key: roachpb.RKey("b"), EndKey: roachpb.RKey("m")}
p.TxnPusher = &tp

txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
// it will shut down with an error. If the WriteBatch is nil then we expect
// the logical operation log to also be nil. We don't want to trigger a
// shutdown of the rangefeed in that situation, so we don't pass anything to
// the rangefed. If no rangefeed is running at all, this call will be a noop.
// the rangefeed. If no rangefeed is running at all, this call will be a noop.
if ops := cmd.raftCmd.LogicalOpLog; cmd.raftCmd.WriteBatch != nil {
b.r.handleLogicalOpLogRaftMuLocked(ctx, ops, b.batch)
} else if ops != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,37 @@ func (s Span) Overlaps(o Span) bool {
return bytes.Compare(s.EndKey, o.Key) > 0 && bytes.Compare(s.Key, o.EndKey) < 0
}

// Intersect returns the intersection of the key space covered by the two spans.
// If there is no intersection between the two spans, an invalid span (see Valid)
// is returned.
func (s Span) Intersect(o Span) Span {
// If two spans do not overlap, there is no intersection between them.
if !s.Overlaps(o) {
return Span{}
}

// An empty end key means this span contains a single key. Overlaps already
// has special code for the single-key cases, so here we return whichever key
// is the single key, if any. If they are both a single key, we know they are
// equal anyway so the order doesn't matter.
if len(s.EndKey) == 0 {
return s
}
if len(o.EndKey) == 0 {
return o
}

key := s.Key
if key.Compare(o.Key) < 0 {
key = o.Key
}
endKey := s.EndKey
if endKey.Compare(o.EndKey) > 0 {
endKey = o.EndKey
}
return Span{key, endKey}
}

// Combine creates a new span containing the full union of the key
// space covered by the two spans. This includes any key space not
// covered by either span, but between them if the spans are disjoint.
Expand Down
53 changes: 53 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,59 @@ func TestSpanOverlaps(t *testing.T) {
}
}

func TestSpanIntersect(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
sAtoC := Span{Key: []byte("a"), EndKey: []byte("c")}
sAtoD := Span{Key: []byte("a"), EndKey: []byte("d")}
sBtoC := Span{Key: []byte("b"), EndKey: []byte("c")}
sBtoD := Span{Key: []byte("b"), EndKey: []byte("d")}
sCtoD := Span{Key: []byte("c"), EndKey: []byte("d")}
// Invalid spans.
sCtoA := Span{Key: []byte("c"), EndKey: []byte("a")}
sDtoB := Span{Key: []byte("d"), EndKey: []byte("b")}

testData := []struct {
s1, s2 Span
expect Span
}{
{sA, sA, sA},
{sA, sAtoC, sA},
{sAtoC, sA, sA},
{sAtoC, sAtoC, sAtoC},
{sAtoC, sAtoD, sAtoC},
{sAtoD, sAtoC, sAtoC},
{sAtoC, sBtoC, sBtoC},
{sBtoC, sAtoC, sBtoC},
{sAtoC, sBtoD, sBtoC},
{sBtoD, sAtoC, sBtoC},
{sAtoD, sBtoC, sBtoC},
{sBtoC, sAtoD, sBtoC},
// Empty intersections.
{sA, sD, Span{}},
{sA, sBtoD, Span{}},
{sBtoD, sA, Span{}},
{sD, sBtoD, Span{}},
{sBtoD, sD, Span{}},
{sAtoC, sCtoD, Span{}},
{sCtoD, sAtoC, Span{}},
// Invalid spans.
{sAtoC, sDtoB, Span{}},
{sDtoB, sAtoC, Span{}},
{sBtoD, sCtoA, Span{}},
{sCtoA, sBtoD, Span{}},
}
for _, test := range testData {
in := test.s1.Intersect(test.s2)
if test.expect.Valid() {
require.True(t, in.Valid())
require.Equal(t, test.expect, in)
} else {
require.False(t, in.Valid())
}
}
}

func TestSpanCombine(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
Expand Down

0 comments on commit 5e53452

Please sign in to comment.