From a27878fb93b2c903338097d7b4a824b42bacebdf Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 14 Dec 2023 04:56:04 +0000 Subject: [PATCH] ingest,manifest,sstable: connect prefix rules in ingest to prefix replacing iterators --- data_test.go | 18 ++++++ ingest.go | 12 ++++ internal/manifest/version.go | 20 +++++++ sstable/prefix_replacing_iterator.go | 3 - sstable/reader_test.go | 6 +- sstable/reader_virtual.go | 83 ++++++++++++++++++++++------ testdata/ingest_external | 37 +++++++++++++ 7 files changed, 159 insertions(+), 20 deletions(-) diff --git a/data_test.go b/data_test.go index f75637e45d..20973a5a31 100644 --- a/data_test.go +++ b/data_test.go @@ -1271,6 +1271,20 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { func runIngestExternalCmd(td *datadriven.TestData, d *DB, locator string) error { external := make([]ExternalFile, 0) + usePrefixChange := false + var fromPrefix, toPrefix []byte + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "prefix-replace": + vals := td.CmdArgs[i].Vals + if len(vals) != 2 { + return errors.New("usage: prefix-replace=(from,to)") + } + fromPrefix = []byte(vals[0]) + toPrefix = []byte(vals[1]) + usePrefixChange = true + } + } for _, arg := range strings.Split(td.Input, "\n") { fields := strings.Split(arg, ",") if len(fields) != 4 { @@ -1287,6 +1301,10 @@ func runIngestExternalCmd(td *datadriven.TestData, d *DB, locator string) error ef.SmallestUserKey = []byte(fields[2]) ef.LargestUserKey = []byte(fields[3]) ef.HasPointKey = true + if usePrefixChange { + ef.ContentPrefix = fromPrefix + ef.SyntheticPrefix = toPrefix + } external = append(external, ef) } diff --git a/ingest.go b/ingest.go index f6726ab01d..d764c8ecca 100644 --- a/ingest.go +++ b/ingest.go @@ -214,6 +214,13 @@ func ingestLoad1External( // what parts of this sstable are referenced by other nodes. meta.FileBacking.Size = e.Size + if len(e.SyntheticPrefix) != 0 { + meta.PrefixReplacement = &manifest.PrefixReplacement{ + ContentPrefix: e.ContentPrefix, + SyntheticPrefix: e.SyntheticPrefix, + } + } + if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil { return nil, err } @@ -1112,6 +1119,11 @@ type ExternalFile struct { // or range keys. If both structs are false, an error is returned during // ingestion. HasPointKey, HasRangeKey bool + // ContentPrefix and SyntheticPrefix denote a prefix replacement rule causing + // a file, in which all keys have prefix ContentPrefix, to appear whenever it + // is accessed as if those keys all instead have prefix SyntheticPrefix. + // SyntheticPrefix must be a prefix of both SmallestUserKey and LargestUserKey. + ContentPrefix, SyntheticPrefix []byte } // IngestWithStats does the same as Ingest, and additionally returns diff --git a/internal/manifest/version.go b/internal/manifest/version.go index b862e5fe28..ec67013cca 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -280,6 +280,26 @@ type PrefixReplacement struct { ContentPrefix, SyntheticPrefix []byte } +// ReplaceArg replaces the new prefix in the argument with the original prefix. +func (p *PrefixReplacement) ReplaceArg(src []byte) []byte { + return p.replace(src, p.SyntheticPrefix, p.ContentPrefix) +} + +// ReplaceResult replaces the original prefix in the result with the new prefix. +func (p *PrefixReplacement) ReplaceResult(key []byte) []byte { + return p.replace(key, p.ContentPrefix, p.SyntheticPrefix) +} + +func (p *PrefixReplacement) replace(key, from, to []byte) []byte { + if !bytes.HasPrefix(key, from) { + panic(fmt.Sprintf("unexpected prefix in replace: %s", key)) + } + result := make([]byte, 0, len(to)+(len(key)-len(from))) + result = append(result, to...) + result = append(result, key[len(from):]...) + return result +} + // PhysicalFileMeta is used by functions which want a guarantee that their input // belongs to a physical sst and not a virtual sst. // diff --git a/sstable/prefix_replacing_iterator.go b/sstable/prefix_replacing_iterator.go index f1c65388f1..895ccf5619 100644 --- a/sstable/prefix_replacing_iterator.go +++ b/sstable/prefix_replacing_iterator.go @@ -187,9 +187,6 @@ type prefixReplacingFragmentIterator struct { out1, out2 []byte } -// TODO(dt): wire this up. -var _ = newPrefixReplacingFragmentIterator - // newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader // that contains range keys in some key span to make those range keys appear to // be remapped into some other key-span. diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 6998faacab..2018bffeca 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -1393,7 +1393,11 @@ func TestReader_TableFormat(t *testing.T) { } func buildTestTable( - t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression, prefix []byte, + t *testing.T, + numEntries uint64, + blockSize, indexBlockSize int, + compression Compression, + prefix []byte, ) *Reader { provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.NewMem(), "" /* dirName */)) require.NoError(t, err) diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index 4f05e39d7a..fb74cdf23e 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -5,6 +5,7 @@ package sstable import ( + "bytes" "context" "github.com/cockroachdb/pebble/internal/base" @@ -27,11 +28,12 @@ type VirtualReader struct { // Lightweight virtual sstable state which can be passed to sstable iterators. type virtualState struct { - lower InternalKey - upper InternalKey - fileNum base.FileNum - Compare Compare - isForeign bool + lower InternalKey + upper InternalKey + fileNum base.FileNum + Compare Compare + isForeign bool + prefixChange *manifest.PrefixReplacement } func ceilDiv(a, b uint64) uint64 { @@ -48,11 +50,12 @@ func MakeVirtualReader( } vState := virtualState{ - lower: meta.Smallest, - upper: meta.Largest, - fileNum: meta.FileNum, - Compare: reader.Compare, - isForeign: isForeign, + lower: meta.Smallest, + upper: meta.Largest, + fileNum: meta.FileNum, + Compare: reader.Compare, + isForeign: isForeign, + prefixChange: meta.PrefixReplacement, } v := VirtualReader{ vState: vState, @@ -74,6 +77,7 @@ func MakeVirtualReader( v.Properties.NumSizedDeletions = ceilDiv(reader.Properties.NumSizedDeletions*meta.Size, meta.FileBacking.Size) v.Properties.RawPointTombstoneKeySize = ceilDiv(reader.Properties.RawPointTombstoneKeySize*meta.Size, meta.FileBacking.Size) v.Properties.RawPointTombstoneValueSize = ceilDiv(reader.Properties.RawPointTombstoneValueSize*meta.Size, meta.FileBacking.Size) + return v } @@ -85,8 +89,12 @@ func (v *VirtualReader) NewCompactionIter( rp ReaderProvider, bufferPool *BufferPool, ) (Iterator, error) { - return v.reader.newCompactionIter( + i, err := v.reader.newCompactionIter( bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) + if err == nil && v.vState.prefixChange != nil { + i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) + } + return i, err } // NewIterWithBlockPropertyFiltersAndContextEtc wraps @@ -103,9 +111,13 @@ func (v *VirtualReader) NewIterWithBlockPropertyFiltersAndContextEtc( statsCollector *CategoryStatsCollector, rp ReaderProvider, ) (Iterator, error) { - return v.reader.newIterWithBlockPropertyFiltersAndContext( + i, err := v.reader.newIterWithBlockPropertyFiltersAndContext( ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, categoryAndQoS, statsCollector, rp, &v.vState) + if err == nil && v.vState.prefixChange != nil { + i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) + } + return i, err } // ValidateBlockChecksumsOnBacking will call ValidateBlockChecksumsOnBacking on the underlying reader. @@ -123,6 +135,19 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { if iter == nil { return nil, nil } + lower := &v.vState.lower + upper := &v.vState.upper + + if v.vState.prefixChange != nil { + lower = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(lower.UserKey), Trailer: lower.Trailer} + upper = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(upper.UserKey), Trailer: upper.Trailer} + + iter = keyspan.Truncate( + v.reader.Compare, iter, lower.UserKey, upper.UserKey, + lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ + ) + return newPrefixReplacingFragmentIterator(iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix), nil + } // Truncation of spans isn't allowed at a user key that also contains points // in the same virtual sstable, as it would lead to covered points getting @@ -135,8 +160,8 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // includes both point keys), but not [a#2,SET-b#3,SET] (as it would truncate // the rangedel at b and lead to the point being uncovered). return keyspan.Truncate( - v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey, - &v.vState.lower, &v.vState.upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ + v.reader.Compare, iter, lower.UserKey, upper.UserKey, + lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ ), nil } @@ -149,6 +174,18 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { if iter == nil { return nil, nil } + lower := &v.vState.lower + upper := &v.vState.upper + + if v.vState.prefixChange != nil { + lower = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(lower.UserKey), Trailer: lower.Trailer} + upper = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(upper.UserKey), Trailer: upper.Trailer} + iter = keyspan.Truncate( + v.reader.Compare, iter, lower.UserKey, upper.UserKey, + lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ + ) + return newPrefixReplacingFragmentIterator(iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix), nil + } // Truncation of spans isn't allowed at a user key that also contains points // in the same virtual sstable, as it would lead to covered points getting @@ -161,8 +198,8 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { // includes both point keys), but not [a#2,SET-b#3,SET] (as it would truncate // the range key at b and lead to the point being uncovered). return keyspan.Truncate( - v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey, - &v.vState.lower, &v.vState.upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ + v.reader.Compare, iter, lower.UserKey, upper.UserKey, + lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */ ), nil } @@ -195,6 +232,10 @@ func (v *virtualState) constrainBounds( last = end } } + if v.prefixChange != nil { + first = v.prefixChange.ReplaceArg(first) + last = v.prefixChange.ReplaceArg(last) + } // TODO(bananabrick): What if someone passes in bounds completely outside of // virtual sstable bounds? return lastKeyInclusive, first, last @@ -204,6 +245,16 @@ func (v *virtualState) constrainBounds( // enforcing the virtual sstable bounds. func (v *VirtualReader) EstimateDiskUsage(start, end []byte) (uint64, error) { _, f, l := v.vState.constrainBounds(start, end, true /* endInclusive */) + if v.vState.prefixChange != nil { + if !bytes.HasPrefix(f, v.vState.prefixChange.SyntheticPrefix) || !bytes.HasPrefix(l, v.vState.prefixChange.SyntheticPrefix) { + return 0, errInputPrefixMismatch + } + // TODO(dt): we could add a scratch buf to VirtualReader to avoid allocs on + // repeated calls to this. + f = append(append([]byte{}, v.vState.prefixChange.ContentPrefix...), f[len(v.vState.prefixChange.SyntheticPrefix):]...) + l = append(append([]byte{}, v.vState.prefixChange.ContentPrefix...), l[len(v.vState.prefixChange.SyntheticPrefix):]...) + } + return v.reader.EstimateDiskUsage(f, l) } diff --git a/testdata/ingest_external b/testdata/ingest_external index 1692eeaa2e..1f87fc40b4 100644 --- a/testdata/ingest_external +++ b/testdata/ingest_external @@ -152,3 +152,40 @@ f: (foo, .) g: (foo, .) h: (foo, .) . + +build-remote f5 +set ef foo +set eg foo +set eh foo +---- + +ingest-external prefix-replace=(e,f) +f5,10,ff,fi +---- + +iter +first +next +next +next +next +next +next +next +next +next +next +next +---- +a: (foo, .) +b: (bar, .) +c: (foobarbaz, .) +d: (haha, .) +e: (something, .) +f: (foo, .) +ff: (foo, .) +fg: (foo, .) +fh: (foo, .) +g: (foo, .) +h: (foo, .) +.