Skip to content

Commit

Permalink
internal/keyspan: obey Seek invariants in filteringIter
Browse files Browse the repository at this point in the history
Previously if filteringIter's FilterFunc mutated the passed-in span
to no longer be a valid return value for a SeekLT or SeekGE call,
we would still return that span even though it could be >=  the seek
key (for SeekLT), or less than it (for SeekGE).

This change updates filteringIter to guard for this case before
returning from a seek call.

Found with #3044.

Informs cockroachdb/cockroach#113973, cockroachdb/cockroach#114056.
  • Loading branch information
itsbilal committed Nov 9, 2023
1 parent 2f752b3 commit c4f530d
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 14 deletions.
25 changes: 21 additions & 4 deletions internal/keyspan/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package keyspan

import "github.com/cockroachdb/pebble/internal/base"

// FilterFunc defines a transform from the input Span into the output Span. The
// function returns true if the Span should be returned by the iterator, and
// false if the Span should be skipped. The FilterFunc is permitted to mutate
Expand All @@ -21,6 +23,7 @@ type FilterFunc func(in *Span, out *Span) (keep bool)
type filteringIter struct {
iter FragmentIterator
filterFn FilterFunc
cmp base.Compare

// span is a mutable Span passed to the filterFn. The filterFn is free to
// mutate this Span. The slice of Keys in the Span is reused with every call
Expand All @@ -32,18 +35,32 @@ var _ FragmentIterator = (*filteringIter)(nil)

// Filter returns a new filteringIter that will filter the Spans from the
// provided child iterator using the provided FilterFunc.
func Filter(iter FragmentIterator, filter FilterFunc) FragmentIterator {
return &filteringIter{iter: iter, filterFn: filter}
func Filter(iter FragmentIterator, filter FilterFunc, cmp base.Compare) FragmentIterator {
return &filteringIter{iter: iter, filterFn: filter, cmp: cmp}
}

// SeekGE implements FragmentIterator.
func (i *filteringIter) SeekGE(key []byte) *Span {
return i.filter(i.iter.SeekGE(key), +1)
span := i.filter(i.iter.SeekGE(key), +1)
// i.filter could return a span that's less than key, _if_ the filterFunc
// (which has no knowledge of the seek key) mutated the span to end at a key
// less than or equal to `key`. Detect this case and next/invalidate the iter.
if span != nil && i.cmp(span.End, key) <= 0 {
return i.Next()
}
return span
}

// SeekLT implements FragmentIterator.
func (i *filteringIter) SeekLT(key []byte) *Span {
return i.filter(i.iter.SeekLT(key), -1)
span := i.filter(i.iter.SeekLT(key), -1)
// i.filter could return a span that's >= key, _if_ the filterFunc (which has
// no knowledge of the seek key) mutated the span to start at a key greater
// than or equal to `key`. Detect this case and prev/invalidate the iter.
if span != nil && i.cmp(span.Start, key) >= 0 {
return i.Prev()
}
return span
}

// First implements FragmentIterator.
Expand Down
2 changes: 1 addition & 1 deletion internal/keyspan/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestFilteringIter(t *testing.T) {
}
}
innerIter := NewIter(cmp, spans)
iter := Filter(innerIter, filter)
iter := Filter(innerIter, filter, cmp)
defer iter.Close()
s := runFragmentIteratorCmd(iter, td.Input, nil)
return s
Expand Down
120 changes: 120 additions & 0 deletions internal/keyspan/testdata/truncate
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,123 @@ truncate g-g
truncate g-h
----
3: gh

# Regression test for https://github.com/cockroachdb/cockroach/issues/113973.

truncate-and-save-iter a-dd
----
ok

saved-iter
first
next
next
next
----
b-d:{(#1,RANGEDEL)}
d-dd:{(#2,RANGEDEL)}
<nil>
<nil>

saved-iter
seek-ge e
next
next
----
<nil>
<nil>
<nil>

saved-iter
seek-ge e
prev
prev
----
<nil>
d-dd:{(#2,RANGEDEL)}
b-d:{(#1,RANGEDEL)}

saved-iter
seek-lt e
prev
prev
----
d-dd:{(#2,RANGEDEL)}
b-d:{(#1,RANGEDEL)}
<nil>

saved-iter
seek-lt e
next
next
----
d-dd:{(#2,RANGEDEL)}
<nil>
<nil>

truncate-and-save-iter ee-h
----
ok

saved-iter
first
next
next
next
----
ee-f:{(#2,RANGEDEL)}
f-h:{(#3,RANGEDEL)}
<nil>
<nil>

saved-iter
seek-ge e
next
next
----
ee-f:{(#2,RANGEDEL)}
f-h:{(#3,RANGEDEL)}
<nil>

saved-iter
seek-ge e
prev
prev
----
ee-f:{(#2,RANGEDEL)}
<nil>
<nil>

saved-iter
seek-lt e
prev
prev
----
<nil>
<nil>
<nil>

saved-iter
seek-lt e
next
next
----
<nil>
ee-f:{(#2,RANGEDEL)}
f-h:{(#3,RANGEDEL)}


truncate-and-save-iter a-g
----
ok

saved-iter
seek-ge h
prev
seek-lt h
next
----
<nil>
f-g:{(#3,RANGEDEL)}
f-g:{(#3,RANGEDEL)}
<nil>
2 changes: 1 addition & 1 deletion internal/keyspan/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ func Truncate(
}

return !out.Empty() && cmp(out.Start, out.End) < 0
})
}, cmp)
}
94 changes: 87 additions & 7 deletions internal/keyspan/truncate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package keyspan

import (
"bytes"
"fmt"
"io"
"strings"
"testing"

Expand All @@ -17,15 +19,16 @@ func TestTruncate(t *testing.T) {
cmp := base.DefaultComparer.Compare
fmtKey := base.DefaultComparer.FormatKey
var iter FragmentIterator
var savedIter FragmentIterator
defer func() {
if savedIter != nil {
savedIter.Close()
savedIter = nil
}
}()

datadriven.RunTest(t, "testdata/truncate", func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "build":
tombstones := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
iter = NewIter(cmp, tombstones)
return formatAlphabeticSpans(tombstones)

case "truncate":
doTruncate := func() FragmentIterator {
if len(d.Input) > 0 {
t.Fatalf("unexpected input: %s", d.Input)
}
Expand Down Expand Up @@ -55,15 +58,92 @@ func TestTruncate(t *testing.T) {
tIter := Truncate(
cmp, iter, lower, upper, startKey, endKey, false,
)
return tIter
}

switch d.Cmd {
case "build":
tombstones := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
iter = NewIter(cmp, tombstones)
return formatAlphabeticSpans(tombstones)

case "truncate":
tIter := doTruncate()
defer tIter.Close()
var truncated []Span
for s := tIter.First(); s != nil; s = tIter.Next() {
truncated = append(truncated, s.ShallowClone())
}
return formatAlphabeticSpans(truncated)

case "truncate-and-save-iter":
if savedIter != nil {
savedIter.Close()
}
savedIter = doTruncate()
return "ok"

case "saved-iter":
return runIterCmd(t, d, savedIter)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

// runIterCmd evaluates a datadriven command controlling an internal
// keyspan.FragmentIterator, returning a string with the results of the iterator
// operations.
func runIterCmd(t *testing.T, td *datadriven.TestData, iter FragmentIterator) string {
var buf bytes.Buffer
lines := strings.Split(strings.TrimSpace(td.Input), "\n")
for i, line := range lines {
if i > 0 {
fmt.Fprintln(&buf)
}
line = strings.TrimSpace(line)
i := strings.IndexByte(line, '#')
iterCmd := line
if i > 0 {
iterCmd = string(line[:i])
}
dataDrivenRunIterOp(&buf, iter, iterCmd)
}
return buf.String()
}

func dataDrivenRunIterOp(w io.Writer, it FragmentIterator, op string) {
fields := strings.FieldsFunc(op, func(r rune) bool { return iterDelim[r] })
var s *Span
switch strings.ToLower(fields[0]) {
case "first":
s = it.First()
case "last":
s = it.Last()
case "seekge", "seek-ge":
if len(fields) == 1 {
panic(fmt.Sprintf("unable to parse iter op %q", op))
}
s = it.SeekGE([]byte(fields[1]))
case "seeklt", "seek-lt":
if len(fields) == 1 {
panic(fmt.Sprintf("unable to parse iter op %q", op))
}
s = it.SeekLT([]byte(fields[1]))
case "next":
s = it.Next()
case "prev":
s = it.Prev()
default:
panic(fmt.Sprintf("unrecognized iter op %q", fields[0]))
}
if s == nil {
fmt.Fprint(w, "<nil>")
if err := it.Error(); err != nil {
fmt.Fprintf(w, " err=<%s>", it.Error())
}
return
}
fmt.Fprint(w, s)
}
2 changes: 1 addition & 1 deletion table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func newCombinedDeletionKeyspanIter(
out.Keys = append(out.Keys, k)
}
return len(out.Keys) > 0
})
}, comparer.Compare)
dIter := &keyspan.DefragmentingIter{}
dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
iter = dIter
Expand Down

0 comments on commit c4f530d

Please sign in to comment.