Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crl-release-23.2: internal/keyspan: obey Seek invariants in filteringIter #3049

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions internal/keyspan/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package keyspan

import "github.com/cockroachdb/pebble/internal/base"

// FilterFunc defines a transform from the input Span into the output Span. The
// function returns true if the Span should be returned by the iterator, and
// false if the Span should be skipped. The FilterFunc is permitted to mutate
Expand All @@ -21,6 +23,7 @@ type FilterFunc func(in *Span, out *Span) (keep bool)
type filteringIter struct {
iter FragmentIterator
filterFn FilterFunc
cmp base.Compare

// span is a mutable Span passed to the filterFn. The filterFn is free to
// mutate this Span. The slice of Keys in the Span is reused with every call
Expand All @@ -32,18 +35,32 @@ var _ FragmentIterator = (*filteringIter)(nil)

// Filter returns a new filteringIter that will filter the Spans from the
// provided child iterator using the provided FilterFunc.
func Filter(iter FragmentIterator, filter FilterFunc) FragmentIterator {
return &filteringIter{iter: iter, filterFn: filter}
func Filter(iter FragmentIterator, filter FilterFunc, cmp base.Compare) FragmentIterator {
return &filteringIter{iter: iter, filterFn: filter, cmp: cmp}
}

// SeekGE implements FragmentIterator.
func (i *filteringIter) SeekGE(key []byte) *Span {
return i.filter(i.iter.SeekGE(key), +1)
span := i.filter(i.iter.SeekGE(key), +1)
// i.filter could return a span that's less than key, _if_ the filterFunc
// (which has no knowledge of the seek key) mutated the span to end at a key
// less than or equal to `key`. Detect this case and next/invalidate the iter.
if span != nil && i.cmp(span.End, key) <= 0 {
return i.Next()
}
return span
}

// SeekLT implements FragmentIterator.
func (i *filteringIter) SeekLT(key []byte) *Span {
return i.filter(i.iter.SeekLT(key), -1)
span := i.filter(i.iter.SeekLT(key), -1)
// i.filter could return a span that's >= key, _if_ the filterFunc (which has
// no knowledge of the seek key) mutated the span to start at a key greater
// than or equal to `key`. Detect this case and prev/invalidate the iter.
if span != nil && i.cmp(span.Start, key) >= 0 {
return i.Prev()
}
return span
}

// First implements FragmentIterator.
Expand Down
2 changes: 1 addition & 1 deletion internal/keyspan/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestFilteringIter(t *testing.T) {
}
}
innerIter := NewIter(cmp, spans)
iter := Filter(innerIter, filter)
iter := Filter(innerIter, filter, cmp)
defer iter.Close()
s := runFragmentIteratorCmd(iter, td.Input, nil)
return s
Expand Down
120 changes: 120 additions & 0 deletions internal/keyspan/testdata/truncate
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,123 @@ truncate g-g
truncate g-h
----
3: gh

# Regression test for https://github.com/cockroachdb/cockroach/issues/113973.

truncate-and-save-iter a-dd
----
ok

saved-iter
first
next
next
next
----
b-d:{(#1,RANGEDEL)}
d-dd:{(#2,RANGEDEL)}
<nil>
<nil>

saved-iter
seek-ge e
next
next
----
<nil>
<nil>
<nil>

saved-iter
seek-ge e
prev
prev
----
<nil>
d-dd:{(#2,RANGEDEL)}
b-d:{(#1,RANGEDEL)}

saved-iter
seek-lt e
prev
prev
----
d-dd:{(#2,RANGEDEL)}
b-d:{(#1,RANGEDEL)}
<nil>

saved-iter
seek-lt e
next
next
----
d-dd:{(#2,RANGEDEL)}
<nil>
<nil>

truncate-and-save-iter ee-h
----
ok

saved-iter
first
next
next
next
----
ee-f:{(#2,RANGEDEL)}
f-h:{(#3,RANGEDEL)}
<nil>
<nil>

saved-iter
seek-ge e
next
next
----
ee-f:{(#2,RANGEDEL)}
f-h:{(#3,RANGEDEL)}
<nil>

saved-iter
seek-ge e
prev
prev
----
ee-f:{(#2,RANGEDEL)}
<nil>
<nil>

saved-iter
seek-lt e
prev
prev
----
<nil>
<nil>
<nil>

saved-iter
seek-lt e
next
next
----
<nil>
ee-f:{(#2,RANGEDEL)}
f-h:{(#3,RANGEDEL)}


truncate-and-save-iter a-g
----
ok

saved-iter
seek-ge h
prev
seek-lt h
next
----
<nil>
f-g:{(#3,RANGEDEL)}
f-g:{(#3,RANGEDEL)}
<nil>
2 changes: 1 addition & 1 deletion internal/keyspan/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ func Truncate(
}

return !out.Empty() && cmp(out.Start, out.End) < 0
})
}, cmp)
}
94 changes: 87 additions & 7 deletions internal/keyspan/truncate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package keyspan

import (
"bytes"
"fmt"
"io"
"strings"
"testing"

Expand All @@ -17,15 +19,16 @@ func TestTruncate(t *testing.T) {
cmp := base.DefaultComparer.Compare
fmtKey := base.DefaultComparer.FormatKey
var iter FragmentIterator
var savedIter FragmentIterator
defer func() {
if savedIter != nil {
savedIter.Close()
savedIter = nil
}
}()

datadriven.RunTest(t, "testdata/truncate", func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "build":
tombstones := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
iter = NewIter(cmp, tombstones)
return formatAlphabeticSpans(tombstones)

case "truncate":
doTruncate := func() FragmentIterator {
if len(d.Input) > 0 {
t.Fatalf("unexpected input: %s", d.Input)
}
Expand Down Expand Up @@ -55,15 +58,92 @@ func TestTruncate(t *testing.T) {
tIter := Truncate(
cmp, iter, lower, upper, startKey, endKey, false,
)
return tIter
}

switch d.Cmd {
case "build":
tombstones := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
iter = NewIter(cmp, tombstones)
return formatAlphabeticSpans(tombstones)

case "truncate":
tIter := doTruncate()
defer tIter.Close()
var truncated []Span
for s := tIter.First(); s != nil; s = tIter.Next() {
truncated = append(truncated, s.ShallowClone())
}
return formatAlphabeticSpans(truncated)

case "truncate-and-save-iter":
if savedIter != nil {
savedIter.Close()
}
savedIter = doTruncate()
return "ok"

case "saved-iter":
return runIterCmd(t, d, savedIter)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

// runIterCmd evaluates a datadriven command controlling an internal
// keyspan.FragmentIterator, returning a string with the results of the iterator
// operations.
func runIterCmd(t *testing.T, td *datadriven.TestData, iter FragmentIterator) string {
var buf bytes.Buffer
lines := strings.Split(strings.TrimSpace(td.Input), "\n")
for i, line := range lines {
if i > 0 {
fmt.Fprintln(&buf)
}
line = strings.TrimSpace(line)
i := strings.IndexByte(line, '#')
iterCmd := line
if i > 0 {
iterCmd = string(line[:i])
}
dataDrivenRunIterOp(&buf, iter, iterCmd)
}
return buf.String()
}

func dataDrivenRunIterOp(w io.Writer, it FragmentIterator, op string) {
fields := strings.FieldsFunc(op, func(r rune) bool { return iterDelim[r] })
var s *Span
switch strings.ToLower(fields[0]) {
case "first":
s = it.First()
case "last":
s = it.Last()
case "seekge", "seek-ge":
if len(fields) == 1 {
panic(fmt.Sprintf("unable to parse iter op %q", op))
}
s = it.SeekGE([]byte(fields[1]))
case "seeklt", "seek-lt":
if len(fields) == 1 {
panic(fmt.Sprintf("unable to parse iter op %q", op))
}
s = it.SeekLT([]byte(fields[1]))
case "next":
s = it.Next()
case "prev":
s = it.Prev()
default:
panic(fmt.Sprintf("unrecognized iter op %q", fields[0]))
}
if s == nil {
fmt.Fprint(w, "<nil>")
if err := it.Error(); err != nil {
fmt.Fprintf(w, " err=<%s>", it.Error())
}
return
}
fmt.Fprint(w, s)
}
2 changes: 1 addition & 1 deletion table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func newCombinedDeletionKeyspanIter(
out.Keys = append(out.Keys, k)
}
return len(out.Keys) > 0
})
}, comparer.Compare)
dIter := &keyspan.DefragmentingIter{}
dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
iter = dIter
Expand Down
Loading