Skip to content

Commit

Permalink
db: schedule sstable validation on ingestion
Browse files Browse the repository at this point in the history
Currently, when Pebble ingests an sstable, it validates the block
checksums necessary for it to retrieve the smallest and largest keys in
the table.

Wire up the block checksum validation codepath from #1240, scheduling
the validation of the ingested sstable on a background goroutine.

This ingestion validation is gated on a new experimental DB option,
`ValidateOnIngest`, which is initially off by default.

See #1203.
  • Loading branch information
nicktrav committed Oct 12, 2021
1 parent 543bc6f commit 0ee4290
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 3 deletions.
14 changes: 14 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,17 @@ type DB struct {
// them.
pending []manifest.NewFileEntry
}

tableValidation struct {
// cond is a condition variable used to signal the completion of a
// job to validate one or more sstables.
cond sync.Cond
// pending is a slice of metadata for sstables waiting to be
// validated.
pending []newFileEntry
// validating is set to true when validation is running.
validating bool
}
}

// Normally equal to time.Now() but may be overridden in tests.
Expand Down Expand Up @@ -955,6 +966,9 @@ func (d *DB) Close() error {
for d.mu.tableStats.loading {
d.mu.tableStats.cond.Wait()
}
for d.mu.tableValidation.validating {
d.mu.tableValidation.cond.Wait()
}

var err error
if n := len(d.mu.compact.inProgress); n > 0 {
Expand Down
29 changes: 29 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,22 @@ func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
}

// TableValidatedInfo contains information on the result of a validation run
// on an sstable.
type TableValidatedInfo struct {
JobID int
Meta *fileMetadata
}

func (i TableValidatedInfo) String() string {
return redact.StringWithoutMarkers(i)
}

// SafeFormat implements redact.SafeFormatter.
func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
}

// WALCreateInfo contains info about a WAL creation event.
type WALCreateInfo struct {
// JobID is the ID of the job the caused the WAL to be created.
Expand Down Expand Up @@ -435,6 +451,9 @@ type EventListener struct {
// collector has loaded statistics for all tables that existed at Open.
TableStatsLoaded func(TableStatsInfo)

// TableValidated is invoked after validation runs on an sstable.
TableValidated func(TableValidatedInfo)

// WALCreated is invoked after a WAL has been created.
WALCreated func(WALCreateInfo)

Expand Down Expand Up @@ -498,6 +517,9 @@ func (l *EventListener) EnsureDefaults(logger Logger) {
if l.TableStatsLoaded == nil {
l.TableStatsLoaded = func(info TableStatsInfo) {}
}
if l.TableValidated == nil {
l.TableValidated = func(validated TableValidatedInfo) {}
}
if l.WALCreated == nil {
l.WALCreated = func(info WALCreateInfo) {}
}
Expand Down Expand Up @@ -559,6 +581,9 @@ func MakeLoggingEventListener(logger Logger) EventListener {
TableStatsLoaded: func(info TableStatsInfo) {
logger.Infof("%s", info)
},
TableValidated: func(info TableValidatedInfo) {
logger.Infof("%s", info)
},
WALCreated: func(info WALCreateInfo) {
logger.Infof("%s", info)
},
Expand Down Expand Up @@ -631,6 +656,10 @@ func TeeEventListener(a, b EventListener) EventListener {
a.TableStatsLoaded(info)
b.TableStatsLoaded(info)
},
TableValidated: func(info TableValidatedInfo) {
a.TableValidated(info)
b.TableValidated(info)
},
WALCreated: func(info WALCreateInfo) {
a.WALCreated(info)
b.WALCreated(info)
Expand Down
94 changes: 94 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,5 +691,99 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error)
// The ingestion may have pushed a level over the threshold for compaction,
// so check to see if one is necessary and schedule it.
d.maybeScheduleCompaction()
d.maybeValidateSSTablesLocked(ve.NewFiles)
return ve, nil
}

// maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
// queue of files to be validated, when the feature is enabled.
// DB.mu must be locked when calling.
func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
// Only add to the validation queue when the feature is enabled.
if !d.opts.Experimental.ValidateOnIngest {
return
}

d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
if d.shouldValidateSSTablesLocked() {
go d.validateSSTables()
}
}

// shouldValidateSSTablesLocked returns true if SSTable validation should run.
// DB.mu must be locked when calling.
func (d *DB) shouldValidateSSTablesLocked() bool {
return !d.mu.tableValidation.validating &&
d.closed.Load() == nil &&
d.opts.Experimental.ValidateOnIngest &&
len(d.mu.tableValidation.pending) > 0
}

// validateSSTables runs a round of validation on the tables in the pending
// queue.
func (d *DB) validateSSTables() {
d.mu.Lock()
if !d.shouldValidateSSTablesLocked() {
d.mu.Unlock()
return
}

pending := d.mu.tableValidation.pending
d.mu.tableValidation.pending = nil
d.mu.tableValidation.validating = true
jobID := d.mu.nextJobID
d.mu.nextJobID++
rs := d.loadReadState()

// Drop DB.mu before performing IO.
d.mu.Unlock()

// Validate all tables in the pending queue. This could lead to a situation
// where we are starving IO from other tasks due to having to page through
// all the blocks in all the sstables in the queue.
// TODO(travers): Add some form of pacing to avoid IO starvation.
for _, f := range pending {
// The file may have been moved or deleted since it was ingested, in
// which case we skip.
if !rs.current.Contains(f.Level, d.cmp, f.Meta) {
// Assume the file was moved to a lower level. It is rare enough
// that a table is moved or deleted between the time it was ingested
// and the time the validation routine runs that the overall cost of
// this inner loop is tolerably low, when amortized over all
// ingested tables.
found := false
for i := f.Level + 1; i < numLevels; i++ {
if rs.current.Contains(i, d.cmp, f.Meta) {
found = true
break
}
}
if !found {
continue
}
}

err := d.tableCache.withReader(f.Meta, func(r *sstable.Reader) error {
return r.ValidateBlockChecksums()
})
if err != nil {
// TODO(travers): Hook into the corruption reporting pipeline, once
// available. See pebble#1192.
d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
}

d.opts.EventListener.TableValidated(TableValidatedInfo{
JobID: jobID,
Meta: f.Meta,
})
}
rs.unref()

d.mu.Lock()
defer d.mu.Unlock()
d.mu.tableValidation.validating = false
d.mu.tableValidation.cond.Broadcast()
if d.shouldValidateSSTablesLocked() {
go d.validateSSTables()
}
}
Loading

0 comments on commit 0ee4290

Please sign in to comment.