Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sstable: fixes to prefix replacing iterator #3344

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 119 additions & 46 deletions sstable/prefix_replacing_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,38 @@ type prefixReplacingIterator struct {
contentPrefix []byte
syntheticPrefix []byte

// keyInRange is a valid key in the logical range that has the syntheticPrefix.
// When an argument to a seek function does not have the syntheticPrefix,
// keyInRange is used to determine if the argument key is before or after the
// range of keys produced by the iterator.
keyInRange []byte
// lower is a valid key that has syntheticPrefix and is a lower bound for all
// keys produced by i. It is used to determine if the argument key is before
// or after the range of keys produced by the iterator.
lower []byte

// arg and arg2 are buffers that are used to avoid allocations when rewriting
// keys that are provided as arguments. They always start with contentPrefix.
arg, arg2 []byte

state prefixReplacingIteratorState
// Last bounds received via SetBounds.
lowerBound, upperBound []byte
// res is used to avoid allocations when rewriting result keys. It always
// starts with syntheticPrefix.
res InternalKey
err error
// empty is set after a seek operation that returns no keys.
empty bool
}

func errInputPrefixMismatch() error {
return errors.AssertionFailedf("key argument does not have prefix required for replacement")
}
type prefixReplacingIteratorState int8

const (
// inRange indicates that the prefix replacing iterator is "in sync" with the
// underlying iterator; any Next/Prev calls can be passed through.
inRange prefixReplacingIteratorState = iota
// afterRange indicates that our iterator is positioned after the synthetic
// prefix range. A Prev() call should return the last key/span in the range.
afterRange
// beforeRange indicates that our iterator is positioned after the synthetic
// prefix range. A Next() call should return the first key/span in the range.
beforeRange
empty
)

func errOutputPrefixMismatch() error {
return errors.AssertionFailedf("key returned does not have prefix required for replacement")
Expand All @@ -56,29 +67,28 @@ var _ Iterator = (*prefixReplacingIterator)(nil)
// `syntheticPrefix`. Every key produced by the underlying iterator must have
// `contentPrefix`.
//
// keyInRange is a valid key that starts with syntheticPrefix. When a seek
// function is called with a key that does not start with syntheticPrefix,
// keyInRange is used to determine if the key is before or after the synthetic
// prefix range.
// lower is a valid key that has syntheticPreficx and is a lower bound for all
// keys produced by i. It is used to determine if an argument key is before
// or after the range of keys produced by the iterator.
//
// INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix.
func newPrefixReplacingIterator(
i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare,
i Iterator, contentPrefix, syntheticPrefix []byte, lower []byte, cmp base.Compare,
) Iterator {
if invariants.Enabled {
if len(syntheticPrefix) == 0 {
panic("newPrefixReplacingIterator called without synthetic prefix")
}
if !bytes.HasPrefix(keyInRange, syntheticPrefix) {
panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix))
if !bytes.HasPrefix(lower, syntheticPrefix) {
panic(fmt.Sprintf("lower %q does not have synthetic prefix %q", lower, syntheticPrefix))
}
}
return &prefixReplacingIterator{
i: i,
cmp: cmp,
contentPrefix: contentPrefix,
syntheticPrefix: syntheticPrefix,
keyInRange: keyInRange,
lower: lower,
arg: slices.Clone(contentPrefix),
arg2: slices.Clone(contentPrefix),
res: InternalKey{UserKey: slices.Clone(syntheticPrefix)},
Expand Down Expand Up @@ -121,14 +131,16 @@ func (p *prefixReplacingIterator) rewriteResult(
func (p *prefixReplacingIterator) SeekGE(
key []byte, flags base.SeekGEFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inRange
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) > 0 {
p.empty = true
if p.cmp(key, p.lower) > 0 {
p.state = afterRange
return nil, base.LazyValue{}
}
// Key must be before the range; use First instead.
return p.rewriteResult(p.i.First())
// Key must be before the range; seek to the lower bound instead.
// We don't use First because we may miss out on optimizations passed
// through SeekEFlags.
key = p.lower
}
return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
}
Expand All @@ -137,13 +149,15 @@ func (p *prefixReplacingIterator) SeekGE(
func (p *prefixReplacingIterator) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
if invariants.Enabled && !bytes.HasPrefix(key, prefix) {
panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix))
}
if !bytes.HasPrefix(prefix, p.syntheticPrefix) {
p.state = inRange
if !bytes.HasPrefix(prefix, p.syntheticPrefix) || !bytes.HasPrefix(key, p.syntheticPrefix) {
// We never produce keys with this prefix; we can return nil.
p.empty = true
if p.cmp(prefix, p.lower) < 0 {
// We still want to seek the underlying iterator to potentially enable
// optimizations passed through flags.
p.i.SeekGE(p.rewriteArg(p.lower), flags)
}
p.state = empty
return nil, base.LazyValue{}
}
return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
Expand All @@ -153,52 +167,84 @@ func (p *prefixReplacingIterator) SeekPrefixGE(
func (p *prefixReplacingIterator) SeekLT(
key []byte, flags base.SeekLTFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inRange
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) < 0 {
p.empty = true
if p.cmp(key, p.lower) < 0 {
// Key before the range; no results.
p.state = beforeRange
return nil, base.LazyValue{}
}
// Key must be after the range. Use Last instead.
return p.rewriteResult(p.i.Last())
}

return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
}

// First implements the Iterator interface.
func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inRange
return p.rewriteResult(p.i.First())
}

// Last implements the Iterator interface.
func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
p.empty = false
p.state = inRange
return p.rewriteResult(p.i.Last())
}

// Next implements the Iterator interface.
func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
if p.empty {
switch p.state {
case empty, afterRange:
return nil, base.LazyValue{}
case beforeRange:
p.state = inRange
if p.lowerBound != nil {
return p.rewriteResult(p.i.SeekGE(p.lowerBound, base.SeekGEFlagsNone))
}
return p.rewriteResult(p.i.First())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a violation of the internalIterator contract, right? We don't allow First() to be called if there's a lower bound, instead we want SeekGE(lower) to be called. Same with Last() and the upper bound. You might need to save the bounds in SetBounds so you can do the appropriate translation here instead of passing through a first/last to the child iter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done. TFTR!

case inRange:
return p.rewriteResult(p.i.Next())
default:
panic("invalid iterator state")
}
return p.rewriteResult(p.i.Next())
}

// NextPrefix implements the Iterator interface.
func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
if p.empty {
switch p.state {
case empty, afterRange:
return nil, base.LazyValue{}
case beforeRange:
p.state = inRange
if p.lowerBound != nil {
return p.rewriteResult(p.i.SeekGE(p.lowerBound, base.SeekGEFlagsNone))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might need to be SeekGE(succKey) as the lower bound might not have the prefix we want. And same for the First() call below; we can actually unconditionally call SeekGE(succKey) here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think about this some more. What I did is incorrect, we can't just pass a lower bound below without rewriting its prefix.. We need to basically figure out the intersection of the given bounds and the vtable bounds (most of that is happening today below us).

}
return p.rewriteResult(p.i.First())
case inRange:
return p.rewriteResult(p.i.NextPrefix(succKey))
default:
panic("invalid iterator state")
}
return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
}

// Prev implements the Iterator interface.
func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
if p.empty {
switch p.state {
case empty, beforeRange:
return nil, base.LazyValue{}
case afterRange:
p.state = inRange
if p.upperBound != nil {
return p.rewriteResult(p.i.SeekLT(p.upperBound, base.SeekLTFlagsNone))
}
return p.rewriteResult(p.i.Last())
case inRange:
return p.rewriteResult(p.i.Prev())
default:
panic("invalid iterator state")
}
return p.rewriteResult(p.i.Prev())
}

// Error implements the Iterator interface.
Expand All @@ -216,6 +262,8 @@ func (p *prefixReplacingIterator) Close() error {

// SetBounds implements the Iterator interface.
func (p *prefixReplacingIterator) SetBounds(lower, upper []byte) {
p.lowerBound = lower
p.upperBound = upper
// Check if the underlying iterator requires un-rewritten bounds, i.e. if it
// is going to rewrite them itself or pass them to something e.g. vState that
// will rewrite them.
Expand Down Expand Up @@ -260,6 +308,8 @@ type prefixReplacingFragmentIterator struct {

arg []byte
out1, out2 []byte

state prefixReplacingIteratorState
}

// newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
Expand All @@ -284,9 +334,6 @@ func newPrefixReplacingFragmentIterator(
}

func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
if !bytes.HasPrefix(key, p.syntheticPrefix) {
return nil, errInputPrefixMismatch()
}
p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
return p.arg, nil
}
Expand All @@ -307,8 +354,10 @@ func (p *prefixReplacingFragmentIterator) rewriteSpan(

// SeekGE implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
p.state = inRange
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) > 0 {
p.state = afterRange
return nil, nil
}
// Key must be before the range; use First instead.
Expand All @@ -323,8 +372,10 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err

// SeekLT implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
p.state = inRange
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) < 0 {
p.state = beforeRange
return nil, nil
}
// Key must be after the range; use Last instead.
Expand All @@ -339,22 +390,44 @@ func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, err

// First implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) {
p.state = inRange
return p.rewriteSpan(p.i.First())
}

// Last implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) {
p.state = inRange
return p.rewriteSpan(p.i.Last())
}

// Close implements the FragmentIterator interface.
// Next implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) {
return p.rewriteSpan(p.i.Next())
switch p.state {
case empty, afterRange:
return nil, nil
case beforeRange:
p.state = inRange
return p.rewriteSpan(p.i.First())
case inRange:
return p.rewriteSpan(p.i.Next())
default:
panic("invalid iterator state")
}
}

// Prev implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) {
return p.rewriteSpan(p.i.Prev())
switch p.state {
case empty, beforeRange:
return nil, nil
case afterRange:
p.state = inRange
return p.rewriteSpan(p.i.Last())
case inRange:
return p.rewriteSpan(p.i.Prev())
default:
panic("invalid iterator state")
}
}

// Close implements the FragmentIterator interface.
Expand Down
12 changes: 12 additions & 0 deletions sstable/prefix_replacing_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ func TestPrefixReplacingIterator(t *testing.T) {
got, _ = it.SeekGE(k(100), base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.Prev()
require.Equal(t, k(19), got.UserKey)

got, _ = it.SeekGE(kMax, base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.Prev()
require.Equal(t, k(19), got.UserKey)
})

t.Run("SeekPrefixGE", func(t *testing.T) {
Expand All @@ -112,12 +118,18 @@ func TestPrefixReplacingIterator(t *testing.T) {

got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.Prev()
require.Equal(t, k(19), got.UserKey)
})

t.Run("SeekLT", func(t *testing.T) {
got, _ = it.SeekLT(kMin, base.SeekLTFlagsNone)
require.Nil(t, got)

got, _ = it.Next()
require.Equal(t, k(0), got.UserKey)

got, _ = it.SeekLT(k(0), base.SeekLTFlagsNone)
require.Nil(t, got)

Expand Down
9 changes: 7 additions & 2 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ var _ CommonReader = (*VirtualReader)(nil)

// Lightweight virtual sstable state which can be passed to sstable iterators.
type virtualState struct {
lower InternalKey
upper InternalKey
// Bounds for the virtual table. Note that when prefixChange is set, these
// bounds are the logical bounds which start with the SyntheticPrefix.
lower InternalKey
upper InternalKey

// Virtual table number, for informative purposes.
fileNum base.FileNum
Compare Compare
isSharedIngested bool
Expand Down Expand Up @@ -104,6 +108,7 @@ func (v *VirtualReader) NewCompactionIter(
i, err := v.reader.newCompactionIter(
transforms, bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool)
if err == nil && v.vState.prefixChange != nil {

i = newPrefixReplacingIterator(
i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
v.vState.lower.UserKey, v.reader.Compare,
Expand Down
Loading