diff --git a/sstable/prefix_replacing_iterator.go b/sstable/prefix_replacing_iterator.go index caab3ebb13..cf223c9879 100644 --- a/sstable/prefix_replacing_iterator.go +++ b/sstable/prefix_replacing_iterator.go @@ -9,6 +9,7 @@ import ( "context" "encoding/hex" "fmt" + "slices" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -17,12 +18,27 @@ import ( ) type prefixReplacingIterator struct { - i Iterator - cmp base.Compare - src, dst []byte + i Iterator + cmp base.Compare + contentPrefix []byte + syntheticPrefix []byte + + // keyInRange is a valid key in the logical range that has the syntheticPrefix. + // When an argument to a seek function does not have the syntheticPrefix, + // keyInRange is used to determine if the argument key is before or after the + // range of keys produced by the iterator. + keyInRange []byte + + // arg and arg2 are buffers that are used to avoid allocations when rewriting + // keys that are provided as arguments. They always start with contentPrefix. arg, arg2 []byte - res InternalKey - err error + + // res is used to avoid allocations when rewriting result keys. It always + // starts with syntheticPrefix. + res InternalKey + err error + // empty is set after a seek operation that returns no keys. + empty bool } var errInputPrefixMismatch = errors.New("key argument does not have prefix required for replacement") @@ -30,22 +46,37 @@ var errOutputPrefixMismatch = errors.New("key returned does not have prefix requ var _ Iterator = (*prefixReplacingIterator)(nil) -// newPrefixReplacingIterator wraps an iterator over keys that have prefix `src` -// in an iterator that will make them appear to have prefix `dst`. Every key -// passed as an argument to methods on this iterator must have prefix `dst`, and -// every key produced by the underlying iterator must have prefix `src`. +// newPrefixReplacingIterator wraps an iterator over keys that have +// `contentPrefix` in an iterator that will make them appear to have +// `syntheticPrefix`. Every key produced by the underlying iterator must have +// `contentPrefix`. +// +// keyInRange is a valid key that starts with syntheticPrefix. When a seek +// function is called with a key that does not start with syntheticPrefix, +// keyInRange is used to determine if the key is before or after the synthetic +// prefix range. // -// INVARIANT: len(dst) > 0. -func newPrefixReplacingIterator(i Iterator, src, dst []byte, cmp base.Compare) Iterator { - if invariants.Enabled && len(dst) == 0 { - panic("newPrefixReplacingIterator called without synthetic prefix") +// INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix. +func newPrefixReplacingIterator( + i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare, +) Iterator { + if invariants.Enabled { + if len(syntheticPrefix) == 0 { + panic("newPrefixReplacingIterator called without synthetic prefix") + } + if !bytes.HasPrefix(keyInRange, syntheticPrefix) { + panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix)) + } } return &prefixReplacingIterator{ - i: i, - cmp: cmp, - src: src, dst: dst, - arg: append([]byte{}, src...), arg2: append([]byte{}, src...), - res: InternalKey{UserKey: append([]byte{}, dst...)}, + i: i, + cmp: cmp, + contentPrefix: contentPrefix, + syntheticPrefix: syntheticPrefix, + keyInRange: keyInRange, + arg: slices.Clone(contentPrefix), + arg2: slices.Clone(contentPrefix), + res: InternalKey{UserKey: slices.Clone(syntheticPrefix)}, } } @@ -54,20 +85,12 @@ func (p *prefixReplacingIterator) SetContext(ctx context.Context) { } func (p *prefixReplacingIterator) rewriteArg(key []byte) []byte { - if !bytes.HasPrefix(key, p.dst) { - p.err = errInputPrefixMismatch - return key - } - p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...) + p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...) return p.arg } func (p *prefixReplacingIterator) rewriteArg2(key []byte) []byte { - if !bytes.HasPrefix(key, p.dst) { - p.err = errInputPrefixMismatch - return key - } - p.arg2 = append(p.arg2[:len(p.src)], key[len(p.dst):]...) + p.arg2 = append(p.arg2[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...) return p.arg2 } @@ -77,7 +100,7 @@ func (p *prefixReplacingIterator) rewriteResult( if k == nil { return k, v } - if !bytes.HasPrefix(k.UserKey, p.src) { + if !bytes.HasPrefix(k.UserKey, p.contentPrefix) { p.err = errOutputPrefixMismatch if invariants.Enabled { panic(p.err) @@ -85,7 +108,7 @@ func (p *prefixReplacingIterator) rewriteResult( return nil, base.LazyValue{} } p.res.Trailer = k.Trailer - p.res.UserKey = append(p.res.UserKey[:len(p.dst)], k.UserKey[len(p.src):]...) + p.res.UserKey = append(p.res.UserKey[:len(p.syntheticPrefix)], k.UserKey[len(p.contentPrefix):]...) return &p.res, v } @@ -93,6 +116,15 @@ func (p *prefixReplacingIterator) rewriteResult( func (p *prefixReplacingIterator) SeekGE( key []byte, flags base.SeekGEFlags, ) (*InternalKey, base.LazyValue) { + p.empty = false + if !bytes.HasPrefix(key, p.syntheticPrefix) { + if p.cmp(key, p.keyInRange) > 0 { + p.empty = true + return nil, base.LazyValue{} + } + // Key must be before the range; use First instead. + return p.rewriteResult(p.i.First()) + } return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags)) } @@ -100,6 +132,15 @@ func (p *prefixReplacingIterator) SeekGE( func (p *prefixReplacingIterator) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, ) (*InternalKey, base.LazyValue) { + p.empty = false + if invariants.Enabled && !bytes.HasPrefix(key, prefix) { + panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix)) + } + if !bytes.HasPrefix(prefix, p.syntheticPrefix) { + // We never produce keys with this prefix; we can return nil. + p.empty = true + return nil, base.LazyValue{} + } return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags)) } @@ -107,37 +148,51 @@ func (p *prefixReplacingIterator) SeekPrefixGE( func (p *prefixReplacingIterator) SeekLT( key []byte, flags base.SeekLTFlags, ) (*InternalKey, base.LazyValue) { - cmp := p.cmp(key, p.dst) - if cmp < 0 { - // Exhaust the iterator by Prev()ing before the First key. - p.i.First() - return p.rewriteResult(p.i.Prev()) + p.empty = false + if !bytes.HasPrefix(key, p.syntheticPrefix) { + if p.cmp(key, p.keyInRange) < 0 { + p.empty = true + return nil, base.LazyValue{} + } + // Key must be after the range. Use Last instead. + return p.rewriteResult(p.i.Last()) } return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags)) } // First implements the Iterator interface. func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) { + p.empty = false return p.rewriteResult(p.i.First()) } // Last implements the Iterator interface. func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) { + p.empty = false return p.rewriteResult(p.i.Last()) } // Next implements the Iterator interface. func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) { + if p.empty { + return nil, base.LazyValue{} + } return p.rewriteResult(p.i.Next()) } // NextPrefix implements the Iterator interface. func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) { + if p.empty { + return nil, base.LazyValue{} + } return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey))) } // Prev implements the Iterator interface. func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) { + if p.empty { + return nil, base.LazyValue{} + } return p.rewriteResult(p.i.Prev()) } @@ -178,7 +233,7 @@ func (p *prefixReplacingIterator) MaybeFilteredKeys() bool { // String implements the Iterator interface. func (p *prefixReplacingIterator) String() string { - return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.src), hex.EncodeToString(p.dst)) + return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.contentPrefix), hex.EncodeToString(p.syntheticPrefix)) } func (p *prefixReplacingIterator) SetCloseHook(fn func(i Iterator) error) { @@ -186,8 +241,18 @@ func (p *prefixReplacingIterator) SetCloseHook(fn func(i Iterator) error) { } type prefixReplacingFragmentIterator struct { - i keyspan.FragmentIterator - src, dst []byte + i keyspan.FragmentIterator + cmp base.Compare + + contentPrefix []byte + syntheticPrefix []byte + + // keyInRange is a valid key in the logical range that has the syntheticPrefix. + // When an argument to a seek function does not have the syntheticPrefix, + // keyInRange is used to determine if the argument key is before or after the + // range of keys produced by the iterator. + keyInRange []byte + arg []byte out1, out2 []byte } @@ -196,38 +261,54 @@ type prefixReplacingFragmentIterator struct { // that contains range keys in some key span to make those range keys appear to // be remapped into some other key-span. func newPrefixReplacingFragmentIterator( - i keyspan.FragmentIterator, src, dst []byte, + i keyspan.FragmentIterator, + contentPrefix, syntheticPrefix []byte, + keyInRange []byte, + cmp base.Compare, ) keyspan.FragmentIterator { return &prefixReplacingFragmentIterator{ - i: i, - src: src, dst: dst, - arg: append([]byte{}, src...), - out1: append([]byte(nil), dst...), - out2: append([]byte(nil), dst...), + i: i, + cmp: cmp, + contentPrefix: contentPrefix, + syntheticPrefix: syntheticPrefix, + keyInRange: keyInRange, + arg: slices.Clone(contentPrefix), + out1: slices.Clone(syntheticPrefix), + out2: slices.Clone(syntheticPrefix), } } func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) { - if !bytes.HasPrefix(key, p.dst) { + if !bytes.HasPrefix(key, p.syntheticPrefix) { return nil, errInputPrefixMismatch } - p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...) + p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...) return p.arg, nil } func (p *prefixReplacingFragmentIterator) rewriteSpan( sp *keyspan.Span, err error, ) (*keyspan.Span, error) { - if !bytes.HasPrefix(sp.Start, p.src) || !bytes.HasPrefix(sp.End, p.src) { - return nil, errInputPrefixMismatch + if sp == nil { + return sp, err + } + if !bytes.HasPrefix(sp.Start, p.contentPrefix) || !bytes.HasPrefix(sp.End, p.contentPrefix) { + return nil, errOutputPrefixMismatch } - sp.Start = append(p.out1[:len(p.dst)], sp.Start[len(p.src):]...) - sp.End = append(p.out2[:len(p.dst)], sp.End[len(p.src):]...) + sp.Start = append(p.out1[:len(p.syntheticPrefix)], sp.Start[len(p.contentPrefix):]...) + sp.End = append(p.out2[:len(p.syntheticPrefix)], sp.End[len(p.contentPrefix):]...) return sp, nil } // SeekGE implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) { + if !bytes.HasPrefix(key, p.syntheticPrefix) { + if p.cmp(key, p.keyInRange) > 0 { + return nil, nil + } + // Key must be before the range; use First instead. + return p.First() + } rewrittenKey, err := p.rewriteArg(key) if err != nil { return nil, err @@ -237,6 +318,13 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err // SeekLT implements the FragmentIterator interface. func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) { + if !bytes.HasPrefix(key, p.syntheticPrefix) { + if p.cmp(key, p.keyInRange) < 0 { + return nil, nil + } + // Key must be after the range; use Last instead. + return p.Last() + } rewrittenKey, err := p.rewriteArg(key) if err != nil { return nil, err diff --git a/sstable/prefix_replacing_iterator_test.go b/sstable/prefix_replacing_iterator_test.go index 1ff6e6b899..1008133989 100644 --- a/sstable/prefix_replacing_iterator_test.go +++ b/sstable/prefix_replacing_iterator_test.go @@ -32,9 +32,11 @@ func TestPrefixReplacingIterator(t *testing.T) { raw := rawIter.(*singleLevelIterator) - it := newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare) + it := newPrefixReplacingIterator(raw, tc.from, tc.to, tc.to, DefaultComparer.Compare) - kMin, kMax, k := []byte{0}, []byte("~"), func(i uint64) []byte { + kMin := []byte{0} + kMax := []byte("~") + k := func(i uint64) []byte { return binary.BigEndian.AppendUint64(tc.to[:len(tc.to):len(tc.to)], i) } @@ -99,9 +101,6 @@ func TestPrefixReplacingIterator(t *testing.T) { }) t.Run("SeekPrefixGE", func(t *testing.T) { - got, _ = it.SeekPrefixGE(tc.to, kMin, base.SeekGEFlagsNone) - require.Equal(t, k(0), got.UserKey) - got, _ = it.SeekPrefixGE(tc.to, k(0), base.SeekGEFlagsNone) require.Equal(t, k(0), got.UserKey) @@ -113,9 +112,6 @@ func TestPrefixReplacingIterator(t *testing.T) { got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone) require.Nil(t, got) - - got, _ = it.SeekPrefixGE(tc.to, kMax, base.SeekGEFlagsNone) - require.Nil(t, got) }) t.Run("SeekLT", func(t *testing.T) { diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index bf4abe8d99..2cec8be64f 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -257,19 +257,29 @@ func (i *singleLevelIterator) getSyntheticSuffx() SyntheticSuffix { return nil } -// Helper function to check if keys returned from iterator are within global and virtual bounds. +// Helper function to check if keys returned from iterator are within virtual bounds. func (i *singleLevelIterator) maybeVerifyKey( iKey *InternalKey, val base.LazyValue, ) (*InternalKey, base.LazyValue) { - // maybeVerify key is only used for virtual sstable iterators. - if invariants.Enabled && i.vState != nil && iKey != nil && i.upper != nil && i.lower != nil { + if invariants.Enabled && iKey != nil && i.vState != nil { key := iKey.UserKey - - uc, vuc := i.cmp(key, i.upper), i.cmp(key, i.vState.upper.UserKey) - lc, vlc := i.cmp(key, i.lower), i.cmp(key, i.vState.lower.UserKey) - - if (i.vState.upper.IsExclusiveSentinel() && vuc == 0) || (!i.endKeyInclusive && uc == 0) || uc > 0 || vuc > 0 || lc < 0 || vlc < 0 { - panic(fmt.Sprintf("key: %s out of bounds of singleLevelIterator: i.upper %s, i.lower %s, vstate upper %s, vstate lower %s", key, i.upper, i.lower, i.vState.upper.UserKey, i.vState.lower.UserKey)) + v := i.vState + var uc, lc int + if p := v.prefixChange; p != nil { + if !bytes.HasPrefix(key, p.ContentPrefix) { + panic(fmt.Sprintf("key %q does not have content prefix %q", key, v.prefixChange.ContentPrefix)) + } + // We are assuming that the key comparator works if we just skip the + // prefix portion that we are replacing. This is true for all known + // implementations. + lc = i.cmp(key[len(p.ContentPrefix):], v.lower.UserKey[len(p.SyntheticPrefix):]) + uc = i.cmp(key[len(p.ContentPrefix):], v.upper.UserKey[len(p.SyntheticPrefix):]) + } else { + lc = i.cmp(key, v.lower.UserKey) + uc = i.cmp(key, v.upper.UserKey) + } + if lc < 0 || uc > 0 || (uc == 0 && v.upper.IsExclusiveSentinel()) { + panic(fmt.Sprintf("key %q out of singleLeveliterator virtual bounds %s %s", key, v.lower, v.lower)) } } return iKey, val diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 6d32a83a1c..41658f543c 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -1270,7 +1270,11 @@ func TestRandomizedSuffixRewriter(t *testing.T) { context.Background(), nil, nil, nil, false, true, nil, CategoryAndQoS{}, nil, - TrivialReaderProvider{Reader: eReader}, &virtualState{syntheticSuffix: syntheticSuffix}) + TrivialReaderProvider{Reader: eReader}, &virtualState{ + lower: base.MakeInternalKey([]byte("a"), base.InternalKeySeqNumMax, base.InternalKeyKindSet), + upper: base.MakeRangeDeleteSentinelKey([]byte("zzzzzzzzzzzzzzzzzzz")), + syntheticSuffix: syntheticSuffix, + }) require.NoError(t, err) return iter, func() { require.NoError(t, iter.Close()) diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index ee30c735a0..69c6242dd1 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -5,7 +5,6 @@ package sstable import ( - "bytes" "context" "github.com/cockroachdb/pebble/internal/base" @@ -93,7 +92,10 @@ func (v *VirtualReader) NewCompactionIter( i, err := v.reader.newCompactionIter( bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) if err == nil && v.vState.prefixChange != nil { - i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) + i = newPrefixReplacingIterator( + i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, + v.vState.lower.UserKey, v.reader.Compare, + ) } return i, err } @@ -116,7 +118,10 @@ func (v *VirtualReader) NewIterWithBlockPropertyFiltersAndContextEtc( ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, categoryAndQoS, statsCollector, rp, &v.vState) if err == nil && v.vState.prefixChange != nil { - i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) + i = newPrefixReplacingIterator( + i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, + v.vState.lower.UserKey, v.reader.Compare, + ) } return i, err } @@ -147,7 +152,10 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { v.reader.Compare, iter, lower.UserKey, upper.UserKey, lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ ) - return newPrefixReplacingFragmentIterator(iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix), nil + return newPrefixReplacingFragmentIterator( + iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, + v.vState.lower.UserKey, v.reader.Compare, + ), nil } // Truncation of spans isn't allowed at a user key that also contains points @@ -203,7 +211,10 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { v.reader.Compare, iter, lower.UserKey, upper.UserKey, lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ ) - return newPrefixReplacingFragmentIterator(iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix), nil + return newPrefixReplacingFragmentIterator( + iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, + v.vState.lower.UserKey, v.reader.Compare, + ), nil } // Truncation of spans isn't allowed at a user key that also contains points @@ -264,16 +275,6 @@ func (v *virtualState) constrainBounds( // enforcing the virtual sstable bounds. func (v *VirtualReader) EstimateDiskUsage(start, end []byte) (uint64, error) { _, f, l := v.vState.constrainBounds(start, end, true /* endInclusive */) - if v.vState.prefixChange != nil { - if !bytes.HasPrefix(f, v.vState.prefixChange.SyntheticPrefix) || !bytes.HasPrefix(l, v.vState.prefixChange.SyntheticPrefix) { - return 0, errInputPrefixMismatch - } - // TODO(dt): we could add a scratch buf to VirtualReader to avoid allocs on - // repeated calls to this. - f = append(append([]byte{}, v.vState.prefixChange.ContentPrefix...), f[len(v.vState.prefixChange.SyntheticPrefix):]...) - l = append(append([]byte{}, v.vState.prefixChange.ContentPrefix...), l[len(v.vState.prefixChange.SyntheticPrefix):]...) - } - return v.reader.EstimateDiskUsage(f, l) } diff --git a/testdata/ingest_external b/testdata/ingest_external index 84b28c08d2..a4626b5ea7 100644 --- a/testdata/ingest_external +++ b/testdata/ingest_external @@ -364,3 +364,57 @@ ge: (something, .) gg: (foo, .) gh: (foo, .) gi: (foo, .) + +# Test seeks with keys outside of the synthetic prefix range. +reset +---- + +build-remote f9 +set i foo +---- + +ingest-external prefix-replace=(,c) +f9,10,cg,ck +---- + +iter +seek-ge bp +---- +ci: (foo, .) + +iter +seek-prefix-ge bp +---- +. + +iter +seek-lt de +---- +ci: (foo, .) + +reset +---- + +batch +set a1 foo +set ci1 bar +set z1 baz +---- + +build-remote f10 +del-range i j +---- + +ingest-external prefix-replace=(,c) +f10,10,cg,ck +---- + +iter +seek-ge bp +---- +z1: (baz, .) + +iter +seek-lt de +---- +a1: (foo, .)