Skip to content

Commit

Permalink
manifest: Added Virtual SSTable Metadata Bounds Checking
Browse files Browse the repository at this point in the history
Added a check inside of `FileMetadata.ValidateVirtual()` to ensure that
the keys returned from a virtual sstable are within the bounds of the
Smallest and Largest metadata keys.

Informs: cockroachdb#2593
  • Loading branch information
raggar committed Jul 27, 2023
1 parent 628c410 commit 657436e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 0 deletions.
88 changes: 88 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2503,3 +2503,91 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
func (d *DB) ObjProvider() objstorage.Provider {
return d.objProvider
}

func (d *DB) checkVirtualBounds(m *fileMetadata, iterOpts *IterOptions) {
if !invariants.Enabled {
return
}

if m.HasPointKeys {
pointIter, rangeDelIter, err := d.newIters(context.TODO(), m, iterOpts, internalIterOpts{})
if err != nil {
panic(errors.Wrap(err, "pebble: error creating point iterator"))
}

defer pointIter.Close()
if rangeDelIter != nil {
defer rangeDelIter.Close()
}

pointKey, _ := pointIter.First()
var rangeDel *rangekey.Span
if rangeDelIter != nil {
rangeDel = rangeDelIter.First()
}

// Check that the lower bound is tight.
if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
(pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum)))
}

pointKey, _ = pointIter.Last()
rangeDel = nil
if rangeDelIter != nil {
rangeDel = rangeDelIter.Last()
}

// Check that the upper bound is tight.
if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
(pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum)))
}

// Check that iterator keys are within bounds.
for key, _ := pointIter.First(); key != nil; key, _ = pointIter.Next() {
if d.cmp(key.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(key.UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey)))
}
}

if rangeDelIter != nil {
for key := rangeDelIter.First(); key != nil; key = rangeDelIter.Next() {
if d.cmp(key.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey)))
}

if d.cmp(key.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey)))
}
}
}
}

if m.HasRangeKeys {
rangeKeyIter, err := d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{Level: iterOpts.level})

if err != nil {
panic(errors.Wrap(err, "pebble: error creating range key iterator"))
}

// Check that the lower bound is tight.
if d.cmp(rangeKeyIter.First().SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum)))
}

// Check that upper bound is tight.
if d.cmp(rangeKeyIter.Last().LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum)))
}

for key := rangeKeyIter.First(); key != nil; key = rangeKeyIter.Next() {
if d.cmp(key.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey)))
}
if d.cmp(key.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
panic(errors.Wrap(err, fmt.Sprintf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey)))
}
}
}
}
7 changes: 7 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,9 @@ func (d *DB) ingest(
if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil {
return IngestOperationStats{}, err
}
for i, sharedMeta := range loadResult.sharedMeta {
d.checkVirtualBounds(sharedMeta, &IterOptions{level: manifest.Level(loadResult.sharedLevels[i])})
}
// Make the new tables durable. We need to do this at some point before we
// update the MANIFEST (via logAndApply), otherwise a crash can have the
// tables referenced in the MANIFEST, but not present in the provider.
Expand Down Expand Up @@ -1490,6 +1493,8 @@ func (d *DB) excise(
if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
return nil, err
}
leftFile.ValidateVirtual(m)
d.checkVirtualBounds(leftFile, nil /* iterOptions */)
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
ve.CreatedBackingTables = append(ve.CreatedBackingTables, leftFile.FileBacking)
backingTableCreated = true
Expand Down Expand Up @@ -1597,6 +1602,8 @@ func (d *DB) excise(
// for it here.
rightFile.Size = 1
}
rightFile.ValidateVirtual(m)
d.checkVirtualBounds(rightFile, nil /* iterOptions */)
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
if !backingTableCreated {
ve.CreatedBackingTables = append(ve.CreatedBackingTables, rightFile.FileBacking)
Expand Down
2 changes: 2 additions & 0 deletions table_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@ func TestVirtualReadsWiring(t *testing.T) {
v2.SmallestPointKey = v2.Smallest

v1.ValidateVirtual(parentFile)
d.checkVirtualBounds(v1, nil /* iterOptions */)
v2.ValidateVirtual(parentFile)
d.checkVirtualBounds(v2, nil /* iterOptions */)

// Write the version edit.
fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
Expand Down
4 changes: 4 additions & 0 deletions version_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func TestLatestRefCounting(t *testing.T) {
m2.SmallestPointKey = m2.Smallest

m1.ValidateVirtual(f)
d.checkVirtualBounds(m1, nil /* iterOptions */)
m2.ValidateVirtual(f)
d.checkVirtualBounds(m2, nil /* iterOptions */)

fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
metrics := newFileMetrics(ve.NewFiles)
Expand Down Expand Up @@ -282,7 +284,9 @@ func TestVirtualSSTableManifestReplay(t *testing.T) {
m2.Stats.NumEntries = 1

m1.ValidateVirtual(f)
d.checkVirtualBounds(m1, nil /* iterOptions */)
m2.ValidateVirtual(f)
d.checkVirtualBounds(m2, nil /* iterOptions */)

fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
metrics := newFileMetrics(ve.NewFiles)
Expand Down

0 comments on commit 657436e

Please sign in to comment.