Skip to content

Commit

Permalink
sstable: fixes to prefix replacing iterator
Browse files Browse the repository at this point in the history
We add more logic inside the prefix replacing iterators:
 - pass through `SeekGE` calls even when not strictly necessary, to
   enable optimizations indicated by the flags.
 - keep track of whether we are positioned before/after the synthetic
   prefix range, to enable expected behaviors like `SeekGE` after the
   range followed by `Prev` (or `SeekLT` to before the range followed
   by `Next`)
  • Loading branch information
RaduBerinde committed Feb 24, 2024
1 parent 0b94619 commit bd7b217
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 46 deletions.
153 changes: 107 additions & 46 deletions sstable/prefix_replacing_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,36 @@ type prefixReplacingIterator struct {
contentPrefix []byte
syntheticPrefix []byte

// keyInRange is a valid key in the logical range that has the syntheticPrefix.
// When an argument to a seek function does not have the syntheticPrefix,
// keyInRange is used to determine if the argument key is before or after the
// range of keys produced by the iterator.
keyInRange []byte
// lower is a valid key that has syntheticPreficx and is a lower bound for all
// keys produced by i. It is used to determine if the argument key is before
// or after the range of keys produced by the iterator.
lower []byte

// arg and arg2 are buffers that are used to avoid allocations when rewriting
// keys that are provided as arguments. They always start with contentPrefix.
arg, arg2 []byte

// res is used to avoid allocations when rewriting result keys. It always
// starts with syntheticPrefix.
res InternalKey
err error
// empty is set after a seek operation that returns no keys.
empty bool
}

func errInputPrefixMismatch() error {
return errors.AssertionFailedf("key argument does not have prefix required for replacement")
}
res InternalKey
err error
state iteratorState
}

type iteratorState int8

const (
// inSync indicates that the prefix replacing iterator is "in sync" with the
// underlying iterator; any Next/Prev calls can be passed through.
inSync iteratorState = iota
// afterRange indicates that our iterator is positioned after the synthetic
// prefix range. A Prev() call should return the last key/span in the range.
afterRange
// beforeRange indicates that our iterator is positioned after the synthetic
// prefix range. A Next() call should return the first key/span in the range.
beforeRange
empty
)

func errOutputPrefixMismatch() error {
return errors.AssertionFailedf("key returned does not have prefix required for replacement")
Expand All @@ -63,22 +72,22 @@ var _ Iterator = (*prefixReplacingIterator)(nil)
//
// INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix.
func newPrefixReplacingIterator(
i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare,
i Iterator, contentPrefix, syntheticPrefix []byte, lower []byte, cmp base.Compare,
) Iterator {
if invariants.Enabled {
if len(syntheticPrefix) == 0 {
panic("newPrefixReplacingIterator called without synthetic prefix")
}
if !bytes.HasPrefix(keyInRange, syntheticPrefix) {
panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix))
if !bytes.HasPrefix(lower, syntheticPrefix) {
panic(fmt.Sprintf("lower %q does not have synthetic prefix %q", lower, syntheticPrefix))
}
}
return &prefixReplacingIterator{
i: i,
cmp: cmp,
contentPrefix: contentPrefix,
syntheticPrefix: syntheticPrefix,
keyInRange: keyInRange,
lower: lower,
arg: slices.Clone(contentPrefix),
arg2: slices.Clone(contentPrefix),
res: InternalKey{UserKey: slices.Clone(syntheticPrefix)},
Expand Down Expand Up @@ -121,14 +130,16 @@ func (p *prefixReplacingIterator) rewriteResult(
func (p *prefixReplacingIterator) SeekGE(
key []byte, flags base.SeekGEFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inSync
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) > 0 {
p.empty = true
if p.cmp(key, p.lower) > 0 {
p.state = afterRange
return nil, base.LazyValue{}
}
// Key must be before the range; use First instead.
return p.rewriteResult(p.i.First())
// Key must be before the range; seek to the lower bound instead.
// We don't use First because we may miss out on optimizations passed
// through SeekEFlags.
key = p.lower
}
return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
}
Expand All @@ -137,13 +148,15 @@ func (p *prefixReplacingIterator) SeekGE(
func (p *prefixReplacingIterator) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
if invariants.Enabled && !bytes.HasPrefix(key, prefix) {
panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix))
}
if !bytes.HasPrefix(prefix, p.syntheticPrefix) {
p.state = inSync
if !bytes.HasPrefix(prefix, p.syntheticPrefix) || !bytes.HasPrefix(key, p.syntheticPrefix) {
// We never produce keys with this prefix; we can return nil.
p.empty = true
if p.cmp(prefix, p.lower) < 0 {
// We still want to seek the underlying iterator to potentially enable
// optimizations passed through flags.
p.i.SeekGE(p.rewriteArg(p.lower), flags)
}
p.state = empty
return nil, base.LazyValue{}
}
return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
Expand All @@ -153,52 +166,75 @@ func (p *prefixReplacingIterator) SeekPrefixGE(
func (p *prefixReplacingIterator) SeekLT(
key []byte, flags base.SeekLTFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inSync
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) < 0 {
p.empty = true
if p.cmp(key, p.lower) < 0 {
// Key before the range; no results.
p.state = beforeRange
return nil, base.LazyValue{}
}
// Key must be after the range. Use Last instead.
return p.rewriteResult(p.i.Last())
}

return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
}

// First implements the Iterator interface.
func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inSync
return p.rewriteResult(p.i.First())
}

// Last implements the Iterator interface.
func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inSync
return p.rewriteResult(p.i.Last())
}

// Next implements the Iterator interface.
func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
if p.empty {
switch p.state {
case empty, afterRange:
return nil, base.LazyValue{}
case beforeRange:
p.state = inSync
return p.rewriteResult(p.i.First())
case inSync:
return p.rewriteResult(p.i.Next())
default:
panic("invalid iterator state")
}
return p.rewriteResult(p.i.Next())
}

// NextPrefix implements the Iterator interface.
func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
if p.empty {
switch p.state {
case empty, afterRange:
return nil, base.LazyValue{}
case beforeRange:
p.state = inSync
return p.rewriteResult(p.i.First())
case inSync:
return p.rewriteResult(p.i.NextPrefix(succKey))
default:
panic("invalid iterator state")
}
return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
}

// Prev implements the Iterator interface.
func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
if p.empty {
switch p.state {
case empty, beforeRange:
return nil, base.LazyValue{}
case afterRange:
p.state = inSync
return p.rewriteResult(p.i.Last())
case inSync:
return p.rewriteResult(p.i.Prev())
default:
panic("invalid iterator state")
}
return p.rewriteResult(p.i.Prev())
}

// Error implements the Iterator interface.
Expand Down Expand Up @@ -260,6 +296,8 @@ type prefixReplacingFragmentIterator struct {

arg []byte
out1, out2 []byte

state iteratorState
}

// newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
Expand All @@ -284,9 +322,6 @@ func newPrefixReplacingFragmentIterator(
}

func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
if !bytes.HasPrefix(key, p.syntheticPrefix) {
return nil, errInputPrefixMismatch()
}
p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
return p.arg, nil
}
Expand All @@ -307,8 +342,10 @@ func (p *prefixReplacingFragmentIterator) rewriteSpan(

// SeekGE implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
p.state = inSync
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) > 0 {
p.state = afterRange
return nil, nil
}
// Key must be before the range; use First instead.
Expand All @@ -323,8 +360,10 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err

// SeekLT implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
p.state = inSync
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) < 0 {
p.state = beforeRange
return nil, nil
}
// Key must be after the range; use Last instead.
Expand All @@ -339,22 +378,44 @@ func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, err

// First implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) {
p.state = inSync
return p.rewriteSpan(p.i.First())
}

// Last implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) {
p.state = inSync
return p.rewriteSpan(p.i.Last())
}

// Close implements the FragmentIterator interface.
// Next implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) {
return p.rewriteSpan(p.i.Next())
switch p.state {
case empty, afterRange:
return nil, nil
case beforeRange:
p.state = inSync
return p.rewriteSpan(p.i.First())
case inSync:
return p.rewriteSpan(p.i.Next())
default:
panic("invalid iterator state")
}
}

// Prev implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) {
return p.rewriteSpan(p.i.Prev())
switch p.state {
case empty, beforeRange:
return nil, nil
case afterRange:
p.state = inSync
return p.rewriteSpan(p.i.Last())
case inSync:
return p.rewriteSpan(p.i.Prev())
default:
panic("invalid iterator state")
}
}

// Close implements the FragmentIterator interface.
Expand Down
12 changes: 12 additions & 0 deletions sstable/prefix_replacing_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ func TestPrefixReplacingIterator(t *testing.T) {
got, _ = it.SeekGE(k(100), base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.Prev()
require.Equal(t, k(19), got.UserKey)

got, _ = it.SeekGE(kMax, base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.Prev()
require.Equal(t, k(19), got.UserKey)
})

t.Run("SeekPrefixGE", func(t *testing.T) {
Expand All @@ -112,12 +118,18 @@ func TestPrefixReplacingIterator(t *testing.T) {

got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.Prev()
require.Equal(t, k(19), got.UserKey)
})

t.Run("SeekLT", func(t *testing.T) {
got, _ = it.SeekLT(kMin, base.SeekLTFlagsNone)
require.Nil(t, got)

got, _ = it.Next()
require.Equal(t, k(0), got.UserKey)

got, _ = it.SeekLT(k(0), base.SeekLTFlagsNone)
require.Nil(t, got)

Expand Down
4 changes: 4 additions & 0 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ var _ CommonReader = (*VirtualReader)(nil)

// Lightweight virtual sstable state which can be passed to sstable iterators.
type virtualState struct {
// Bounds for the virtual table. Note that when prefixChange is set, these
// bounds are the logical bounds which start with the SyntheticPrefix.
lower InternalKey
upper InternalKey

// Virtual table number, for informative purposes.
fileNum base.FileNum
Compare Compare
isSharedIngested bool
Expand Down

0 comments on commit bd7b217

Please sign in to comment.