diff --git a/sstable/prefix_replacing_iterator.go b/sstable/prefix_replacing_iterator.go index b3e93a3c7b..9db8966bc4 100644 --- a/sstable/prefix_replacing_iterator.go +++ b/sstable/prefix_replacing_iterator.go @@ -23,27 +23,38 @@ type prefixReplacingIterator struct { contentPrefix []byte syntheticPrefix []byte - // keyInRange is a valid key in the logical range that has the syntheticPrefix. - // When an argument to a seek function does not have the syntheticPrefix, - // keyInRange is used to determine if the argument key is before or after the - // range of keys produced by the iterator. - keyInRange []byte + // lower is a valid key that has syntheticPrefix and is a lower bound for all + // keys produced by i. It is used to determine if the argument key is before + // or after the range of keys produced by the iterator. + lower []byte // arg and arg2 are buffers that are used to avoid allocations when rewriting // keys that are provided as arguments. They always start with contentPrefix. arg, arg2 []byte + state prefixReplacingIteratorState + // Last bounds received via SetBounds. + lowerBound, upperBound []byte // res is used to avoid allocations when rewriting result keys. It always // starts with syntheticPrefix. res InternalKey err error - // empty is set after a seek operation that returns no keys. - empty bool } -func errInputPrefixMismatch() error { - return errors.AssertionFailedf("key argument does not have prefix required for replacement") -} +type prefixReplacingIteratorState int8 + +const ( + // inRange indicates that the prefix replacing iterator is "in sync" with the + // underlying iterator; any Next/Prev calls can be passed through. + inRange prefixReplacingIteratorState = iota + // afterRange indicates that our iterator is positioned after the synthetic + // prefix range. A Prev() call should return the last key/span in the range. + afterRange + // beforeRange indicates that our iterator is positioned after the synthetic + // prefix range. A Next() call should return the first key/span in the range. + beforeRange + empty +) func errOutputPrefixMismatch() error { return errors.AssertionFailedf("key returned does not have prefix required for replacement") @@ -56,21 +67,20 @@ var _ Iterator = (*prefixReplacingIterator)(nil) // `syntheticPrefix`. Every key produced by the underlying iterator must have // `contentPrefix`. // -// keyInRange is a valid key that starts with syntheticPrefix. When a seek -// function is called with a key that does not start with syntheticPrefix, -// keyInRange is used to determine if the key is before or after the synthetic -// prefix range. +// lower is a valid key that has syntheticPreficx and is a lower bound for all +// keys produced by i. It is used to determine if an argument key is before +// or after the range of keys produced by the iterator. // // INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix. func newPrefixReplacingIterator( - i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare, + i Iterator, contentPrefix, syntheticPrefix []byte, lower []byte, cmp base.Compare, ) Iterator { if invariants.Enabled { if len(syntheticPrefix) == 0 { panic("newPrefixReplacingIterator called without synthetic prefix") } - if !bytes.HasPrefix(keyInRange, syntheticPrefix) { - panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix)) + if !bytes.HasPrefix(lower, syntheticPrefix) { + panic(fmt.Sprintf("lower %q does not have synthetic prefix %q", lower, syntheticPrefix)) } } return &prefixReplacingIterator{ @@ -78,7 +88,7 @@ func newPrefixReplacingIterator( cmp: cmp, contentPrefix: contentPrefix, syntheticPrefix: syntheticPrefix, - keyInRange: keyInRange, + lower: lower, arg: slices.Clone(contentPrefix), arg2: slices.Clone(contentPrefix), res: InternalKey{UserKey: slices.Clone(syntheticPrefix)}, @@ -121,14 +131,16 @@ func (p *prefixReplacingIterator) rewriteResult( func (p *prefixReplacingIterator) SeekGE( key []byte, flags base.SeekGEFlags, ) (*InternalKey, base.LazyValue) { - p.empty = false + p.state = inRange if !bytes.HasPrefix(key, p.syntheticPrefix) { - if p.cmp(key, p.keyInRange) > 0 { - p.empty = true + if p.cmp(key, p.lower) > 0 { + p.state = afterRange return nil, base.LazyValue{} } - // Key must be before the range; use First instead. - return p.rewriteResult(p.i.First()) + // Key must be before the range; seek to the lower bound instead. + // We don't use First because we may miss out on optimizations passed + // through SeekEFlags. + key = p.lower } return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags)) } @@ -137,13 +149,15 @@ func (p *prefixReplacingIterator) SeekGE( func (p *prefixReplacingIterator) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, ) (*InternalKey, base.LazyValue) { - p.empty = false - if invariants.Enabled && !bytes.HasPrefix(key, prefix) { - panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix)) - } - if !bytes.HasPrefix(prefix, p.syntheticPrefix) { + p.state = inRange + if !bytes.HasPrefix(prefix, p.syntheticPrefix) || !bytes.HasPrefix(key, p.syntheticPrefix) { // We never produce keys with this prefix; we can return nil. - p.empty = true + if p.cmp(prefix, p.lower) < 0 { + // We still want to seek the underlying iterator to potentially enable + // optimizations passed through flags. + p.i.SeekGE(p.rewriteArg(p.lower), flags) + } + p.state = empty return nil, base.LazyValue{} } return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags)) @@ -153,52 +167,84 @@ func (p *prefixReplacingIterator) SeekPrefixGE( func (p *prefixReplacingIterator) SeekLT( key []byte, flags base.SeekLTFlags, ) (*InternalKey, base.LazyValue) { - p.empty = false + p.state = inRange if !bytes.HasPrefix(key, p.syntheticPrefix) { - if p.cmp(key, p.keyInRange) < 0 { - p.empty = true + if p.cmp(key, p.lower) < 0 { + // Key before the range; no results. + p.state = beforeRange return nil, base.LazyValue{} } // Key must be after the range. Use Last instead. return p.rewriteResult(p.i.Last()) } + return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags)) } // First implements the Iterator interface. func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) { - p.empty = false + p.state = inRange return p.rewriteResult(p.i.First()) } // Last implements the Iterator interface. func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) { - p.empty = false + p.state = inRange return p.rewriteResult(p.i.Last()) } // Next implements the Iterator interface. func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) { - if p.empty { + switch p.state { + case empty, afterRange: return nil, base.LazyValue{} + case beforeRange: + p.state = inRange + if p.lowerBound != nil { + return p.rewriteResult(p.i.SeekGE(p.lowerBound, base.SeekGEFlagsNone)) + } + return p.rewriteResult(p.i.First()) + case inRange: + return p.rewriteResult(p.i.Next()) + default: + panic("invalid iterator state") } - return p.rewriteResult(p.i.Next()) } // NextPrefix implements the Iterator interface. func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) { - if p.empty { + switch p.state { + case empty, afterRange: return nil, base.LazyValue{} + case beforeRange: + p.state = inRange + if p.lowerBound != nil { + return p.rewriteResult(p.i.SeekGE(p.lowerBound, base.SeekGEFlagsNone)) + } + return p.rewriteResult(p.i.First()) + case inRange: + return p.rewriteResult(p.i.NextPrefix(succKey)) + default: + panic("invalid iterator state") } - return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey))) } // Prev implements the Iterator interface. func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) { - if p.empty { + switch p.state { + case empty, beforeRange: return nil, base.LazyValue{} + case afterRange: + p.state = inRange + if p.upperBound != nil { + return p.rewriteResult(p.i.SeekLT(p.upperBound, base.SeekLTFlagsNone)) + } + return p.rewriteResult(p.i.Last()) + case inRange: + return p.rewriteResult(p.i.Prev()) + default: + panic("invalid iterator state") } - return p.rewriteResult(p.i.Prev()) } // Error implements the Iterator interface. @@ -216,6 +262,8 @@ func (p *prefixReplacingIterator) Close() error { // SetBounds implements the Iterator interface. func (p *prefixReplacingIterator) SetBounds(lower, upper []byte) { + p.lowerBound = lower + p.upperBound = upper // Check if the underlying iterator requires un-rewritten bounds, i.e. if it // is going to rewrite them itself or pass them to something e.g. vState that // will rewrite them. @@ -260,6 +308,8 @@ type prefixReplacingFragmentIterator struct { arg []byte out1, out2 []byte + + state prefixReplacingIteratorState } // newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader @@ -284,9 +334,6 @@ func newPrefixReplacingFragmentIterator( } func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) { - if !bytes.HasPrefix(key, p.syntheticPrefix) { - return nil, errInputPrefixMismatch() - } p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...) return p.arg, nil } @@ -307,8 +354,10 @@ func (p *prefixReplacingFragmentIterator) rewriteSpan( // SeekGE implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) { + p.state = inRange if !bytes.HasPrefix(key, p.syntheticPrefix) { if p.cmp(key, p.keyInRange) > 0 { + p.state = afterRange return nil, nil } // Key must be before the range; use First instead. @@ -323,8 +372,10 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err // SeekLT implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) { + p.state = inRange if !bytes.HasPrefix(key, p.syntheticPrefix) { if p.cmp(key, p.keyInRange) < 0 { + p.state = beforeRange return nil, nil } // Key must be after the range; use Last instead. @@ -339,22 +390,44 @@ func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, err // First implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) { + p.state = inRange return p.rewriteSpan(p.i.First()) } // Last implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) { + p.state = inRange return p.rewriteSpan(p.i.Last()) } -// Close implements the FragmentIterator interface. +// Next implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) { - return p.rewriteSpan(p.i.Next()) + switch p.state { + case empty, afterRange: + return nil, nil + case beforeRange: + p.state = inRange + return p.rewriteSpan(p.i.First()) + case inRange: + return p.rewriteSpan(p.i.Next()) + default: + panic("invalid iterator state") + } } // Prev implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) { - return p.rewriteSpan(p.i.Prev()) + switch p.state { + case empty, beforeRange: + return nil, nil + case afterRange: + p.state = inRange + return p.rewriteSpan(p.i.Last()) + case inRange: + return p.rewriteSpan(p.i.Prev()) + default: + panic("invalid iterator state") + } } // Close implements the FragmentIterator interface. diff --git a/sstable/prefix_replacing_iterator_test.go b/sstable/prefix_replacing_iterator_test.go index e37f420a1b..8f9d56ed9d 100644 --- a/sstable/prefix_replacing_iterator_test.go +++ b/sstable/prefix_replacing_iterator_test.go @@ -96,8 +96,14 @@ func TestPrefixReplacingIterator(t *testing.T) { got, _ = it.SeekGE(k(100), base.SeekGEFlagsNone) require.Nil(t, got) + got, _ = it.Prev() + require.Equal(t, k(19), got.UserKey) + got, _ = it.SeekGE(kMax, base.SeekGEFlagsNone) require.Nil(t, got) + + got, _ = it.Prev() + require.Equal(t, k(19), got.UserKey) }) t.Run("SeekPrefixGE", func(t *testing.T) { @@ -112,12 +118,18 @@ func TestPrefixReplacingIterator(t *testing.T) { got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone) require.Nil(t, got) + + got, _ = it.Prev() + require.Equal(t, k(19), got.UserKey) }) t.Run("SeekLT", func(t *testing.T) { got, _ = it.SeekLT(kMin, base.SeekLTFlagsNone) require.Nil(t, got) + got, _ = it.Next() + require.Equal(t, k(0), got.UserKey) + got, _ = it.SeekLT(k(0), base.SeekLTFlagsNone) require.Nil(t, got) diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index a86fa2d3c6..f3ff8f13f9 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -29,8 +29,12 @@ var _ CommonReader = (*VirtualReader)(nil) // Lightweight virtual sstable state which can be passed to sstable iterators. type virtualState struct { - lower InternalKey - upper InternalKey + // Bounds for the virtual table. Note that when prefixChange is set, these + // bounds are the logical bounds which start with the SyntheticPrefix. + lower InternalKey + upper InternalKey + + // Virtual table number, for informative purposes. fileNum base.FileNum Compare Compare isSharedIngested bool @@ -104,6 +108,7 @@ func (v *VirtualReader) NewCompactionIter( i, err := v.reader.newCompactionIter( transforms, bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) if err == nil && v.vState.prefixChange != nil { + i = newPrefixReplacingIterator( i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.vState.lower.UserKey, v.reader.Compare,