Skip to content

Commit

Permalink
db: prefix replacement fixes
Browse files Browse the repository at this point in the history
The prefix replacing iterators did not support arguments that don't
start with the synthetic prefix. We fix this by adding a `keyInRange`
argument that we can compare against when we are outside the synthetic
prefix range to figure out which side we are on.

We also fix assertions in `maybeVerifyKey` which were not correct when
prefix replacement is used.
  • Loading branch information
RaduBerinde authored and dt committed Feb 21, 2024
1 parent 31b39d8 commit e2459be
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 83 deletions.
188 changes: 138 additions & 50 deletions sstable/prefix_replacing_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/hex"
"fmt"
"slices"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
Expand All @@ -17,35 +18,65 @@ import (
)

type prefixReplacingIterator struct {
i Iterator
cmp base.Compare
src, dst []byte
i Iterator
cmp base.Compare
contentPrefix []byte
syntheticPrefix []byte

// keyInRange is a valid key in the logical range that has the syntheticPrefix.
// When an argument to a seek function does not have the syntheticPrefix,
// keyInRange is used to determine if the argument key is before or after the
// range of keys produced by the iterator.
keyInRange []byte

// arg and arg2 are buffers that are used to avoid allocations when rewriting
// keys that are provided as arguments. They always start with contentPrefix.
arg, arg2 []byte
res InternalKey
err error

// res is used to avoid allocations when rewriting result keys. It always
// starts with syntheticPrefix.
res InternalKey
err error
// empty is set after a seek operation that returns no keys.
empty bool
}

var errInputPrefixMismatch = errors.New("key argument does not have prefix required for replacement")
var errOutputPrefixMismatch = errors.New("key returned does not have prefix required for replacement")

var _ Iterator = (*prefixReplacingIterator)(nil)

// newPrefixReplacingIterator wraps an iterator over keys that have prefix `src`
// in an iterator that will make them appear to have prefix `dst`. Every key
// passed as an argument to methods on this iterator must have prefix `dst`, and
// every key produced by the underlying iterator must have prefix `src`.
// newPrefixReplacingIterator wraps an iterator over keys that have
// `contentPrefix` in an iterator that will make them appear to have
// `syntheticPrefix`. Every key produced by the underlying iterator must have
// `contentPrefix`.
//
// keyInRange is a valid key that starts with syntheticPrefix. When a seek
// function is called with a key that does not start with syntheticPrefix,
// keyInRange is used to determine if the key is before or after the synthetic
// prefix range.
//
// INVARIANT: len(dst) > 0.
func newPrefixReplacingIterator(i Iterator, src, dst []byte, cmp base.Compare) Iterator {
if invariants.Enabled && len(dst) == 0 {
panic("newPrefixReplacingIterator called without synthetic prefix")
// INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix.
func newPrefixReplacingIterator(
i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare,
) Iterator {
if invariants.Enabled {
if len(syntheticPrefix) == 0 {
panic("newPrefixReplacingIterator called without synthetic prefix")
}
if !bytes.HasPrefix(keyInRange, syntheticPrefix) {
panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix))
}
}
return &prefixReplacingIterator{
i: i,
cmp: cmp,
src: src, dst: dst,
arg: append([]byte{}, src...), arg2: append([]byte{}, src...),
res: InternalKey{UserKey: append([]byte{}, dst...)},
i: i,
cmp: cmp,
contentPrefix: contentPrefix,
syntheticPrefix: syntheticPrefix,
keyInRange: keyInRange,
arg: slices.Clone(contentPrefix),
arg2: slices.Clone(contentPrefix),
res: InternalKey{UserKey: slices.Clone(syntheticPrefix)},
}
}

Expand All @@ -54,20 +85,12 @@ func (p *prefixReplacingIterator) SetContext(ctx context.Context) {
}

func (p *prefixReplacingIterator) rewriteArg(key []byte) []byte {
if !bytes.HasPrefix(key, p.dst) {
p.err = errInputPrefixMismatch
return key
}
p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
return p.arg
}

func (p *prefixReplacingIterator) rewriteArg2(key []byte) []byte {
if !bytes.HasPrefix(key, p.dst) {
p.err = errInputPrefixMismatch
return key
}
p.arg2 = append(p.arg2[:len(p.src)], key[len(p.dst):]...)
p.arg2 = append(p.arg2[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
return p.arg2
}

Expand All @@ -77,67 +100,99 @@ func (p *prefixReplacingIterator) rewriteResult(
if k == nil {
return k, v
}
if !bytes.HasPrefix(k.UserKey, p.src) {
if !bytes.HasPrefix(k.UserKey, p.contentPrefix) {
p.err = errOutputPrefixMismatch
if invariants.Enabled {
panic(p.err)
}
return nil, base.LazyValue{}
}
p.res.Trailer = k.Trailer
p.res.UserKey = append(p.res.UserKey[:len(p.dst)], k.UserKey[len(p.src):]...)
p.res.UserKey = append(p.res.UserKey[:len(p.syntheticPrefix)], k.UserKey[len(p.contentPrefix):]...)
return &p.res, v
}

// SeekGE implements the Iterator interface.
func (p *prefixReplacingIterator) SeekGE(
key []byte, flags base.SeekGEFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) > 0 {
p.empty = true
return nil, base.LazyValue{}
}
// Key must be before the range; use First instead.
return p.rewriteResult(p.i.First())
}
return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
}

// SeekPrefixGE implements the Iterator interface.
func (p *prefixReplacingIterator) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*InternalKey, base.LazyValue) {
p.empty = false
if invariants.Enabled && !bytes.HasPrefix(key, prefix) {
panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix))
}
if !bytes.HasPrefix(prefix, p.syntheticPrefix) {
// We never produce keys with this prefix; we can return nil.
p.empty = true
return nil, base.LazyValue{}
}
return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
}

// SeekLT implements the Iterator interface.
func (p *prefixReplacingIterator) SeekLT(
key []byte, flags base.SeekLTFlags,
) (*InternalKey, base.LazyValue) {
cmp := p.cmp(key, p.dst)
if cmp < 0 {
// Exhaust the iterator by Prev()ing before the First key.
p.i.First()
return p.rewriteResult(p.i.Prev())
p.empty = false
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) < 0 {
p.empty = true
return nil, base.LazyValue{}
}
// Key must be after the range. Use Last instead.
return p.rewriteResult(p.i.Last())
}
return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
}

// First implements the Iterator interface.
func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
p.empty = false
return p.rewriteResult(p.i.First())
}

// Last implements the Iterator interface.
func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
p.empty = false
return p.rewriteResult(p.i.Last())
}

// Next implements the Iterator interface.
func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
if p.empty {
return nil, base.LazyValue{}
}
return p.rewriteResult(p.i.Next())
}

// NextPrefix implements the Iterator interface.
func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
if p.empty {
return nil, base.LazyValue{}
}
return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
}

// Prev implements the Iterator interface.
func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
if p.empty {
return nil, base.LazyValue{}
}
return p.rewriteResult(p.i.Prev())
}

Expand Down Expand Up @@ -178,16 +233,26 @@ func (p *prefixReplacingIterator) MaybeFilteredKeys() bool {

// String implements the Iterator interface.
func (p *prefixReplacingIterator) String() string {
return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.src), hex.EncodeToString(p.dst))
return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.contentPrefix), hex.EncodeToString(p.syntheticPrefix))
}

func (p *prefixReplacingIterator) SetCloseHook(fn func(i Iterator) error) {
p.i.SetCloseHook(fn)
}

type prefixReplacingFragmentIterator struct {
i keyspan.FragmentIterator
src, dst []byte
i keyspan.FragmentIterator
cmp base.Compare

contentPrefix []byte
syntheticPrefix []byte

// keyInRange is a valid key in the logical range that has the syntheticPrefix.
// When an argument to a seek function does not have the syntheticPrefix,
// keyInRange is used to determine if the argument key is before or after the
// range of keys produced by the iterator.
keyInRange []byte

arg []byte
out1, out2 []byte
}
Expand All @@ -196,38 +261,54 @@ type prefixReplacingFragmentIterator struct {
// that contains range keys in some key span to make those range keys appear to
// be remapped into some other key-span.
func newPrefixReplacingFragmentIterator(
i keyspan.FragmentIterator, src, dst []byte,
i keyspan.FragmentIterator,
contentPrefix, syntheticPrefix []byte,
keyInRange []byte,
cmp base.Compare,
) keyspan.FragmentIterator {
return &prefixReplacingFragmentIterator{
i: i,
src: src, dst: dst,
arg: append([]byte{}, src...),
out1: append([]byte(nil), dst...),
out2: append([]byte(nil), dst...),
i: i,
cmp: cmp,
contentPrefix: contentPrefix,
syntheticPrefix: syntheticPrefix,
keyInRange: keyInRange,
arg: slices.Clone(contentPrefix),
out1: slices.Clone(syntheticPrefix),
out2: slices.Clone(syntheticPrefix),
}
}

func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
if !bytes.HasPrefix(key, p.dst) {
if !bytes.HasPrefix(key, p.syntheticPrefix) {
return nil, errInputPrefixMismatch
}
p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
return p.arg, nil
}

func (p *prefixReplacingFragmentIterator) rewriteSpan(
sp *keyspan.Span, err error,
) (*keyspan.Span, error) {
if !bytes.HasPrefix(sp.Start, p.src) || !bytes.HasPrefix(sp.End, p.src) {
return nil, errInputPrefixMismatch
if sp == nil {
return sp, err
}
if !bytes.HasPrefix(sp.Start, p.contentPrefix) || !bytes.HasPrefix(sp.End, p.contentPrefix) {
return nil, errOutputPrefixMismatch
}
sp.Start = append(p.out1[:len(p.dst)], sp.Start[len(p.src):]...)
sp.End = append(p.out2[:len(p.dst)], sp.End[len(p.src):]...)
sp.Start = append(p.out1[:len(p.syntheticPrefix)], sp.Start[len(p.contentPrefix):]...)
sp.End = append(p.out2[:len(p.syntheticPrefix)], sp.End[len(p.contentPrefix):]...)
return sp, nil
}

// SeekGE implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) > 0 {
return nil, nil
}
// Key must be before the range; use First instead.
return p.First()
}
rewrittenKey, err := p.rewriteArg(key)
if err != nil {
return nil, err
Expand All @@ -237,6 +318,13 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err

// SeekLT implements the FragmentIterator interface.
func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
if !bytes.HasPrefix(key, p.syntheticPrefix) {
if p.cmp(key, p.keyInRange) < 0 {
return nil, nil
}
// Key must be after the range; use Last instead.
return p.Last()
}
rewrittenKey, err := p.rewriteArg(key)
if err != nil {
return nil, err
Expand Down
12 changes: 4 additions & 8 deletions sstable/prefix_replacing_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ func TestPrefixReplacingIterator(t *testing.T) {

raw := rawIter.(*singleLevelIterator)

it := newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare)
it := newPrefixReplacingIterator(raw, tc.from, tc.to, tc.to, DefaultComparer.Compare)

kMin, kMax, k := []byte{0}, []byte("~"), func(i uint64) []byte {
kMin := []byte{0}
kMax := []byte("~")
k := func(i uint64) []byte {
return binary.BigEndian.AppendUint64(tc.to[:len(tc.to):len(tc.to)], i)
}

Expand Down Expand Up @@ -99,9 +101,6 @@ func TestPrefixReplacingIterator(t *testing.T) {
})

t.Run("SeekPrefixGE", func(t *testing.T) {
got, _ = it.SeekPrefixGE(tc.to, kMin, base.SeekGEFlagsNone)
require.Equal(t, k(0), got.UserKey)

got, _ = it.SeekPrefixGE(tc.to, k(0), base.SeekGEFlagsNone)
require.Equal(t, k(0), got.UserKey)

Expand All @@ -113,9 +112,6 @@ func TestPrefixReplacingIterator(t *testing.T) {

got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone)
require.Nil(t, got)

got, _ = it.SeekPrefixGE(tc.to, kMax, base.SeekGEFlagsNone)
require.Nil(t, got)
})

t.Run("SeekLT", func(t *testing.T) {
Expand Down
Loading

0 comments on commit e2459be

Please sign in to comment.