Skip to content

Commit

Permalink
db: strictly enforce prefix in [flushable]BatchIter.SeekPrefixGE
Browse files Browse the repository at this point in the history
Every key we avoid propagating up the iterator stack avoids a key comparison
during initialization of the merging iterator heap.

Informs #3794.
  • Loading branch information
jbowens committed Jul 26, 2024
1 parent 44e5fb4 commit ca2b210
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 4 deletions.
22 changes: 20 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"bytes"
"context"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -1686,7 +1687,16 @@ func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV
}

func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
return i.SeekGE(key, flags)
kv := i.SeekGE(key, flags)
if kv == nil {
return nil
}
// If the key doesn't have the sought prefix, return nil.
if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
i.kv = base.InternalKV{}
return nil
}
return kv
}

func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
Expand Down Expand Up @@ -2160,7 +2170,15 @@ func (i *flushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.In
func (i *flushableBatchIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) *base.InternalKV {
return i.SeekGE(key, flags)
kv := i.SeekGE(key, flags)
if kv == nil {
return nil
}
// If the key doesn't have the sought prefix, return nil.
if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
return nil
}
return kv
}

// SeekLT implements internalIterator.SeekLT, as documented in the pebble
Expand Down
15 changes: 13 additions & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func printIterState(
hasPoint, hasRange := iter.HasPointAndRange()
fmt.Fprintf(b, "%s:%s (", iter.Key(), validityStateStr)
if hasPoint {
fmt.Fprintf(b, "%s, ", iter.Value())
fmt.Fprintf(b, "%s, ", formatASCIIValue(iter.Value()))
} else {
fmt.Fprint(b, "., ")
}
Expand Down Expand Up @@ -388,13 +388,24 @@ func formatASCIIKey(b []byte) string {
return string(b)
}

func formatASCIIValue(b []byte) string {
if len(b) > 1<<10 {
return fmt.Sprintf("[LARGE VALUE len=%d]", len(b))
}
if bytes.IndexFunc(b, func(r rune) bool { return r < '!' || r > 'z' }) != -1 {
// This key is not just legible ASCII characters. Quote it.
return fmt.Sprintf("%q", b)
}
return string(b)
}

func writeRangeKeys(b io.Writer, iter *Iterator) {
rangeKeys := iter.RangeKeys()
for j := 0; j < len(rangeKeys); j++ {
if j > 0 {
fmt.Fprint(b, ",")
}
fmt.Fprintf(b, " %s=%s", rangeKeys[j].Suffix, rangeKeys[j].Value)
fmt.Fprintf(b, " %s=%s", rangeKeys[j].Suffix, formatASCIIValue(rangeKeys[j].Value))
}
}

Expand Down
10 changes: 10 additions & 0 deletions iterator_histories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ func TestIterHistories(t *testing.T) {
return err.Error()
}
return ""
case "disable-flushes":
d.mu.Lock()
d.mu.compact.flushing = true
d.mu.Unlock()
return ""
case "enable-flushes":
d.mu.Lock()
d.mu.compact.flushing = false
d.mu.Unlock()
return ""
case "get":
var reader Reader = d
if arg, ok := td.Arg("reader"); ok {
Expand Down
73 changes: 73 additions & 0 deletions testdata/iter_histories/prefix_iteration
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,76 @@ stats
----
.
stats: seeked 1 times (1 internal); stepped 0 times (0 internal)

# Test that a prefix seek through a batch iterator that enforces the prefix
# strictly.

reset
----

batch name=foo
set b@1 b@1
set d@9 d@9
set g@4 g@4
set e@2 e@2
----
wrote 4 keys to batch "foo"

# The stats should indicate only 3 KVs were ever surfaced to the merging iterator.

combined-iter reader=foo name=fooiter
seek-prefix-ge b@10
seek-prefix-ge c@10
seek-prefix-ge d@10
seek-prefix-ge g@2
seek-prefix-ge e@2
stats
----
b@1: (b@1, .)
.
d@9: (d@9, .)
.
e@2: (e@2, .)
stats: seeked 5 times (5 internal); stepped 0 times (0 internal); blocks: 0B cached; points: 3 (9B keys, 9B values)

# Test the above case but with a large committed batch (which should be a
# flushableBatchIter).

define memtable-size=65536
----

# We diable flushes to avoid scheduling a flush that might race with our
# iterator. If the iterator observed the state after the large batch has been
# flushed to sstables, we would see nonzero block bytes appear in the iterator
# stats.
disable-flushes
----

batch commit
set b@1 <rand-bytes=10000>
set d@9 <rand-bytes=10000>
set g@4 <rand-bytes=10000>
set e@2 <rand-bytes=10000>
----
committed 4 keys

lsm
----

combined-iter
seek-prefix-ge b@10
seek-prefix-ge c@10
seek-prefix-ge d@10
seek-prefix-ge g@2
seek-prefix-ge e@2
stats
----
b@1: ([LARGE VALUE len=10000], .)
.
d@9: ([LARGE VALUE len=10000], .)
.
e@2: ([LARGE VALUE len=10000], .)
stats: seeked 5 times (5 internal); stepped 0 times (0 internal); blocks: 0B cached; points: 3 (9B keys, 29KB values)

enable-flushes
----

0 comments on commit ca2b210

Please sign in to comment.