Skip to content

Commit

Permalink
ingest,manifest,sstable: connect prefix rules in ingest to prefix rep…
Browse files Browse the repository at this point in the history
…lacing iterators
  • Loading branch information
dt committed Dec 20, 2023
1 parent a452c0e commit a27878f
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 20 deletions.
18 changes: 18 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,20 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {

func runIngestExternalCmd(td *datadriven.TestData, d *DB, locator string) error {
external := make([]ExternalFile, 0)
usePrefixChange := false
var fromPrefix, toPrefix []byte
for i := range td.CmdArgs {
switch td.CmdArgs[i].Key {
case "prefix-replace":
vals := td.CmdArgs[i].Vals
if len(vals) != 2 {
return errors.New("usage: prefix-replace=(from,to)")
}
fromPrefix = []byte(vals[0])
toPrefix = []byte(vals[1])
usePrefixChange = true
}
}
for _, arg := range strings.Split(td.Input, "\n") {
fields := strings.Split(arg, ",")
if len(fields) != 4 {
Expand All @@ -1287,6 +1301,10 @@ func runIngestExternalCmd(td *datadriven.TestData, d *DB, locator string) error
ef.SmallestUserKey = []byte(fields[2])
ef.LargestUserKey = []byte(fields[3])
ef.HasPointKey = true
if usePrefixChange {
ef.ContentPrefix = fromPrefix
ef.SyntheticPrefix = toPrefix
}
external = append(external, ef)
}

Expand Down
12 changes: 12 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ func ingestLoad1External(
// what parts of this sstable are referenced by other nodes.
meta.FileBacking.Size = e.Size

if len(e.SyntheticPrefix) != 0 {
meta.PrefixReplacement = &manifest.PrefixReplacement{
ContentPrefix: e.ContentPrefix,
SyntheticPrefix: e.SyntheticPrefix,
}
}

if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1112,6 +1119,11 @@ type ExternalFile struct {
// or range keys. If both structs are false, an error is returned during
// ingestion.
HasPointKey, HasRangeKey bool
// ContentPrefix and SyntheticPrefix denote a prefix replacement rule causing
// a file, in which all keys have prefix ContentPrefix, to appear whenever it
// is accessed as if those keys all instead have prefix SyntheticPrefix.
// SyntheticPrefix must be a prefix of both SmallestUserKey and LargestUserKey.
ContentPrefix, SyntheticPrefix []byte
}

// IngestWithStats does the same as Ingest, and additionally returns
Expand Down
20 changes: 20 additions & 0 deletions internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,26 @@ type PrefixReplacement struct {
ContentPrefix, SyntheticPrefix []byte
}

// ReplaceArg replaces the new prefix in the argument with the original prefix.
func (p *PrefixReplacement) ReplaceArg(src []byte) []byte {
return p.replace(src, p.SyntheticPrefix, p.ContentPrefix)
}

// ReplaceResult replaces the original prefix in the result with the new prefix.
func (p *PrefixReplacement) ReplaceResult(key []byte) []byte {
return p.replace(key, p.ContentPrefix, p.SyntheticPrefix)
}

func (p *PrefixReplacement) replace(key, from, to []byte) []byte {
if !bytes.HasPrefix(key, from) {
panic(fmt.Sprintf("unexpected prefix in replace: %s", key))
}
result := make([]byte, 0, len(to)+(len(key)-len(from)))
result = append(result, to...)
result = append(result, key[len(from):]...)
return result
}

// PhysicalFileMeta is used by functions which want a guarantee that their input
// belongs to a physical sst and not a virtual sst.
//
Expand Down
3 changes: 0 additions & 3 deletions sstable/prefix_replacing_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ type prefixReplacingFragmentIterator struct {
out1, out2 []byte
}

// TODO(dt): wire this up.
var _ = newPrefixReplacingFragmentIterator

// newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
// that contains range keys in some key span to make those range keys appear to
// be remapped into some other key-span.
Expand Down
6 changes: 5 additions & 1 deletion sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,11 @@ func TestReader_TableFormat(t *testing.T) {
}

func buildTestTable(
t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression, prefix []byte,
t *testing.T,
numEntries uint64,
blockSize, indexBlockSize int,
compression Compression,
prefix []byte,
) *Reader {
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.NewMem(), "" /* dirName */))
require.NoError(t, err)
Expand Down
83 changes: 67 additions & 16 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"bytes"
"context"

"github.com/cockroachdb/pebble/internal/base"
Expand All @@ -27,11 +28,12 @@ type VirtualReader struct {

// Lightweight virtual sstable state which can be passed to sstable iterators.
type virtualState struct {
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isForeign bool
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isForeign bool
prefixChange *manifest.PrefixReplacement
}

func ceilDiv(a, b uint64) uint64 {
Expand All @@ -48,11 +50,12 @@ func MakeVirtualReader(
}

vState := virtualState{
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isForeign: isForeign,
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isForeign: isForeign,
prefixChange: meta.PrefixReplacement,
}
v := VirtualReader{
vState: vState,
Expand All @@ -74,6 +77,7 @@ func MakeVirtualReader(
v.Properties.NumSizedDeletions = ceilDiv(reader.Properties.NumSizedDeletions*meta.Size, meta.FileBacking.Size)
v.Properties.RawPointTombstoneKeySize = ceilDiv(reader.Properties.RawPointTombstoneKeySize*meta.Size, meta.FileBacking.Size)
v.Properties.RawPointTombstoneValueSize = ceilDiv(reader.Properties.RawPointTombstoneValueSize*meta.Size, meta.FileBacking.Size)

return v
}

Expand All @@ -85,8 +89,12 @@ func (v *VirtualReader) NewCompactionIter(
rp ReaderProvider,
bufferPool *BufferPool,
) (Iterator, error) {
return v.reader.newCompactionIter(
i, err := v.reader.newCompactionIter(
bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool)
if err == nil && v.vState.prefixChange != nil {
i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare)
}
return i, err
}

// NewIterWithBlockPropertyFiltersAndContextEtc wraps
Expand All @@ -103,9 +111,13 @@ func (v *VirtualReader) NewIterWithBlockPropertyFiltersAndContextEtc(
statsCollector *CategoryStatsCollector,
rp ReaderProvider,
) (Iterator, error) {
return v.reader.newIterWithBlockPropertyFiltersAndContext(
i, err := v.reader.newIterWithBlockPropertyFiltersAndContext(
ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats,
categoryAndQoS, statsCollector, rp, &v.vState)
if err == nil && v.vState.prefixChange != nil {
i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare)
}
return i, err
}

// ValidateBlockChecksumsOnBacking will call ValidateBlockChecksumsOnBacking on the underlying reader.
Expand All @@ -123,6 +135,19 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
if iter == nil {
return nil, nil
}
lower := &v.vState.lower
upper := &v.vState.upper

if v.vState.prefixChange != nil {
lower = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(lower.UserKey), Trailer: lower.Trailer}
upper = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(upper.UserKey), Trailer: upper.Trailer}

iter = keyspan.Truncate(
v.reader.Compare, iter, lower.UserKey, upper.UserKey,
lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
)
return newPrefixReplacingFragmentIterator(iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix), nil
}

// Truncation of spans isn't allowed at a user key that also contains points
// in the same virtual sstable, as it would lead to covered points getting
Expand All @@ -135,8 +160,8 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
// includes both point keys), but not [a#2,SET-b#3,SET] (as it would truncate
// the rangedel at b and lead to the point being uncovered).
return keyspan.Truncate(
v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey,
&v.vState.lower, &v.vState.upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
v.reader.Compare, iter, lower.UserKey, upper.UserKey,
lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
), nil
}

Expand All @@ -149,6 +174,18 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
if iter == nil {
return nil, nil
}
lower := &v.vState.lower
upper := &v.vState.upper

if v.vState.prefixChange != nil {
lower = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(lower.UserKey), Trailer: lower.Trailer}
upper = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(upper.UserKey), Trailer: upper.Trailer}
iter = keyspan.Truncate(
v.reader.Compare, iter, lower.UserKey, upper.UserKey,
lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
)
return newPrefixReplacingFragmentIterator(iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix), nil
}

// Truncation of spans isn't allowed at a user key that also contains points
// in the same virtual sstable, as it would lead to covered points getting
Expand All @@ -161,8 +198,8 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
// includes both point keys), but not [a#2,SET-b#3,SET] (as it would truncate
// the range key at b and lead to the point being uncovered).
return keyspan.Truncate(
v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey,
&v.vState.lower, &v.vState.upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
v.reader.Compare, iter, lower.UserKey, upper.UserKey,
lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
), nil
}

Expand Down Expand Up @@ -195,6 +232,10 @@ func (v *virtualState) constrainBounds(
last = end
}
}
if v.prefixChange != nil {
first = v.prefixChange.ReplaceArg(first)
last = v.prefixChange.ReplaceArg(last)
}
// TODO(bananabrick): What if someone passes in bounds completely outside of
// virtual sstable bounds?
return lastKeyInclusive, first, last
Expand All @@ -204,6 +245,16 @@ func (v *virtualState) constrainBounds(
// enforcing the virtual sstable bounds.
func (v *VirtualReader) EstimateDiskUsage(start, end []byte) (uint64, error) {
_, f, l := v.vState.constrainBounds(start, end, true /* endInclusive */)
if v.vState.prefixChange != nil {
if !bytes.HasPrefix(f, v.vState.prefixChange.SyntheticPrefix) || !bytes.HasPrefix(l, v.vState.prefixChange.SyntheticPrefix) {
return 0, errInputPrefixMismatch
}
// TODO(dt): we could add a scratch buf to VirtualReader to avoid allocs on
// repeated calls to this.
f = append(append([]byte{}, v.vState.prefixChange.ContentPrefix...), f[len(v.vState.prefixChange.SyntheticPrefix):]...)
l = append(append([]byte{}, v.vState.prefixChange.ContentPrefix...), l[len(v.vState.prefixChange.SyntheticPrefix):]...)
}

return v.reader.EstimateDiskUsage(f, l)
}

Expand Down
37 changes: 37 additions & 0 deletions testdata/ingest_external
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,40 @@ f: (foo, .)
g: (foo, .)
h: (foo, .)
.

build-remote f5
set ef foo
set eg foo
set eh foo
----

ingest-external prefix-replace=(e,f)
f5,10,ff,fi
----

iter
first
next
next
next
next
next
next
next
next
next
next
next
----
a: (foo, .)
b: (bar, .)
c: (foobarbaz, .)
d: (haha, .)
e: (something, .)
f: (foo, .)
ff: (foo, .)
fg: (foo, .)
fh: (foo, .)
g: (foo, .)
h: (foo, .)
.

0 comments on commit a27878f

Please sign in to comment.