Skip to content

Commit

Permalink
db: schedule sstable validation on ingestion
Browse files Browse the repository at this point in the history
Currently, when Pebble ingests an sstable, it validates the block
checksums necessary for it to retrieve the smallest and largest keys in
the table.

Wire up the block checksum validation codepath from cockroachdb#1240, scheduling
the validation of the ingested sstable on a background goroutine.

This ingestion validation is gated on a new experimental DB option,
`ValidateOnIngest`, which is initially off by default.

See cockroachdb#1203.
  • Loading branch information
nicktrav committed Oct 6, 2021
1 parent ecc685b commit 1b44d39
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 3 deletions.
14 changes: 14 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,17 @@ type DB struct {
// them.
pending []manifest.NewFileEntry
}

tableValidation struct {
// cond is a condition variable used to signal the completion of a
// job to validate one or more sstables.
cond sync.Cond
// pending is a slice of metadata for sstables waiting to be
// validated.
pending []*fileMetadata
// validating is set to true when validation is running.
validating bool
}
}

// Normally equal to time.Now() but may be overridden in tests.
Expand Down Expand Up @@ -955,6 +966,9 @@ func (d *DB) Close() error {
for d.mu.tableStats.loading {
d.mu.tableStats.cond.Wait()
}
for d.mu.tableValidation.validating {
d.mu.tableValidation.cond.Wait()
}

var err error
if n := len(d.mu.compact.inProgress); n > 0 {
Expand Down
34 changes: 34 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,27 @@ func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
}

// TableValidatedInfo contains information on the result of a validation run
// on an sstable.
type TableValidatedInfo struct {
JobID int
Meta *fileMetadata
Err error
}

func (i TableValidatedInfo) String() string {
return redact.StringWithoutMarkers(i)
}

// SafeFormat implements redact.SafeFormatter.
func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
if i.Err != nil {
w.Printf("[JOB %d] validation for table %s failed: %s", redact.Safe(i.JobID), i.Meta, i.Err)
return
}
w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
}

// WALCreateInfo contains info about a WAL creation event.
type WALCreateInfo struct {
// JobID is the ID of the job the caused the WAL to be created.
Expand Down Expand Up @@ -435,6 +456,9 @@ type EventListener struct {
// collector has loaded statistics for all tables that existed at Open.
TableStatsLoaded func(TableStatsInfo)

// TableValidated is invoked after validation runs on an sstable.
TableValidated func(TableValidatedInfo)

// WALCreated is invoked after a WAL has been created.
WALCreated func(WALCreateInfo)

Expand Down Expand Up @@ -498,6 +522,9 @@ func (l *EventListener) EnsureDefaults(logger Logger) {
if l.TableStatsLoaded == nil {
l.TableStatsLoaded = func(info TableStatsInfo) {}
}
if l.TableValidated == nil {
l.TableValidated = func(validated TableValidatedInfo) {}
}
if l.WALCreated == nil {
l.WALCreated = func(info WALCreateInfo) {}
}
Expand Down Expand Up @@ -559,6 +586,9 @@ func MakeLoggingEventListener(logger Logger) EventListener {
TableStatsLoaded: func(info TableStatsInfo) {
logger.Infof("%s", info)
},
TableValidated: func(info TableValidatedInfo) {
logger.Infof("%s", info)
},
WALCreated: func(info WALCreateInfo) {
logger.Infof("%s", info)
},
Expand Down Expand Up @@ -631,6 +661,10 @@ func TeeEventListener(a, b EventListener) EventListener {
a.TableStatsLoaded(info)
b.TableStatsLoaded(info)
},
TableValidated: func(info TableValidatedInfo) {
a.TableValidated(info)
b.TableValidated(info)
},
WALCreated: func(info WALCreateInfo) {
a.WALCreated(info)
b.WALCreated(info)
Expand Down
70 changes: 70 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,5 +691,75 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error)
// The ingestion may have pushed a level over the threshold for compaction,
// so check to see if one is necessary and schedule it.
d.maybeScheduleCompaction()
d.maybeValidateSSTables(meta)
return ve, nil
}

// maybeValidateSSTables adds the slice of fileMetadata to the pending queue of
// files to be validated, when the feature is enabled.
// DB.mu must be locked when calling.
func (d *DB) maybeValidateSSTables(meta []*fileMetadata) {
// Only add to the validation queue when the feature is enabled.
if !d.opts.Experimental.ValidateOnIngest {
return
}

d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, meta...)
if d.shouldValidateSSTables() {
go d.validateSSTables()
}
}

// shouldValidateSSTables returns true if SSTable validation should run.
// DB.mu must be locked when calling.
func (d *DB) shouldValidateSSTables() bool {
ok := !d.mu.tableValidation.validating
ok = ok && d.closed.Load() == nil
ok = ok && d.opts.Experimental.ValidateOnIngest
ok = ok && len(d.mu.tableValidation.pending) > 0
return ok
}

// validateSSTables runs a round of validation on the tables in the pending
// queue.
func (d *DB) validateSSTables() {
d.mu.Lock()
if !d.shouldValidateSSTables() {
d.mu.Unlock()
return
}

pending := d.mu.tableValidation.pending
d.mu.tableValidation.pending = nil
d.mu.tableValidation.validating = true
jobID := d.mu.nextJobID
d.mu.nextJobID++

// Drop DB.mu before performing IO.
d.mu.Unlock()

// Validate all tables in the pending queue. This could lead to a situation
// where we are starving IO from other tasks due to having to page through
// all the blocks in all the sstables in the queue.
// TODO(travers): Add some form of pacing to avoid IO starvation.
for _, meta := range pending {
err := d.tableCache.withReader(meta, func(r *sstable.Reader) error {
return r.ValidateBlockChecksums()
})
// TODO(travers): Hook into the corruption reporting pipeline, once
// available. See pebble#1192.
d.opts.EventListener.TableValidated(TableValidatedInfo{
JobID: jobID,
Meta: meta,
Err: err,
})
}

d.mu.Lock()
defer d.mu.Unlock()
d.mu.tableValidation.validating = false
d.mu.tableValidation.cond.Broadcast()
if d.shouldValidateSSTables() {
go d.validateSSTables()
}
}
211 changes: 211 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,217 @@ func TestIngestCleanup(t *testing.T) {
}
}

func TestIngestValidation(t *testing.T) {
type keyVal struct {
key, val []byte
}
type corruptionLocation int
const (
corruptionLocationNone corruptionLocation = iota
corruptionLocationStart
corruptionLocationEnd
corruptionLocationInternal
)
type errLocation int
const (
errLocationNone errLocation = iota
errLocationIngest
errLocationValidation
)
const (
nKeys = 1_000
keySize = 10
valSize = 100
blockSize = 100

ingestTableName = "ext"
)

seed := uint64(time.Now().UnixNano())
rng := rand.New(rand.NewSource(seed))
t.Logf("rng seed = %d", seed)

testCases := []struct {
description string
cLoc corruptionLocation
wantErrType errLocation
}{
{
description: "no corruption",
cLoc: corruptionLocationNone,
wantErrType: errLocationNone,
},
{
description: "start block",
cLoc: corruptionLocationStart,
wantErrType: errLocationIngest,
},
{
description: "end block",
cLoc: corruptionLocationEnd,
wantErrType: errLocationIngest,
},
{
description: "non-end block",
cLoc: corruptionLocationInternal,
wantErrType: errLocationValidation,
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
var (
info *TableValidatedInfo
wg sync.WaitGroup
)
wg.Add(1)

fs := vfs.NewMem()
opts := &Options{
FS: fs,
EventListener: EventListener{
TableValidated: func(i TableValidatedInfo) {
info = &i
wg.Done()
},
},
}
opts.Experimental.ValidateOnIngest = true
d, err := Open("", opts)
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()

corrupt := func(f vfs.File) {
// Compute the layout of the sstable in order to find the
// appropriate block locations to corrupt.
r, err := sstable.NewReader(f, sstable.ReaderOptions{})
require.NoError(t, err)
l, err := r.Layout()
require.NoError(t, err)
require.NoError(t, r.Close())

// Select an appropriate data block to corrupt.
var blockIdx int
switch tc.cLoc {
case corruptionLocationStart:
blockIdx = 0
case corruptionLocationEnd:
blockIdx = len(l.Data) - 1
case corruptionLocationInternal:
blockIdx = 1 + rng.Intn(len(l.Data)-2)
default:
t.Fatalf("unknown corruptionLocation: %T", tc.cLoc)
}
bh := l.Data[blockIdx]

osF, err := os.OpenFile(ingestTableName, os.O_RDWR, 0600)
require.NoError(t, err)
defer func() { require.NoError(t, osF.Close()) }()

// Corrupting a key will cause the ingestion to fail due to a
// malformed key, rather than a block checksum mismatch.
// Instead, we corrupt the last byte in the selected block,
// before the trailer, which corresponds to a value.
offset := bh.Offset + bh.Length - 1
_, err = osF.WriteAt([]byte("\xff"), int64(offset))
require.NoError(t, err)
}

type errT struct {
errLoc errLocation
err error
}
runIngest := func(keyVals []keyVal) (et errT) {
// The vfs.File does not allow for random reads and writes.
// Create a disk-backed file outside of the DB FS that we can
// open as a regular os.File, if required.
tmpFS := vfs.Default
f, err := tmpFS.Create(ingestTableName)
require.NoError(t, err)
defer func() { _ = tmpFS.Remove(ingestTableName) }()

w := sstable.NewWriter(f, sstable.WriterOptions{
BlockSize: blockSize, // Create many smaller blocks.
Compression: NoCompression, // For simpler debugging.
})
for _, kv := range keyVals {
require.NoError(t, w.Set(kv.key, kv.val))
}
require.NoError(t, w.Close())

// Possibly corrupt the file.
if tc.cLoc != corruptionLocationNone {
f, err = tmpFS.Open(ingestTableName)
require.NoError(t, err)
corrupt(f)
}

// Copy the file into the DB's FS.
_, err = vfs.Clone(tmpFS, fs, ingestTableName, ingestTableName)
require.NoError(t, err)

// Ingest the external table.
err = d.Ingest([]string{ingestTableName})
if err != nil {
et.errLoc = errLocationIngest
et.err = err
return
}

// Wait for the validation on the sstable to complete.
wg.Wait()

// Return any error encountered during validation.
et.err = info.Err
if et.err != nil {
et.errLoc = errLocationValidation
}

return
}

// Construct a set of keys to ingest.
var keyVals []keyVal
for i := 0; i < nKeys; i++ {
key := make([]byte, 0, keySize)
_, err = rng.Read(key)
require.NoError(t, err)

val := make([]byte, 0, valSize)
_, err = rng.Read(val)
require.NoError(t, err)

keyVals = append(keyVals, keyVal{key, val})
}

// Keys must be sorted.
sort.Slice(keyVals, func(i, j int) bool {
return d.cmp(keyVals[i].key, keyVals[j].key) <= 0
})

// Run the ingestion.
et := runIngest(keyVals)

// Assert we saw the errors we expect.
switch tc.wantErrType {
case errLocationNone:
require.Equal(t, errLocationNone, et.errLoc)
require.NoError(t, et.err)
case errLocationIngest:
require.Equal(t, errLocationIngest, et.errLoc)
require.Error(t, et.err)
require.True(t, errors.Is(et.err, base.ErrCorruption))
case errLocationValidation:
require.Equal(t, errLocationValidation, et.errLoc)
require.Error(t, et.err)
require.True(t, errors.Is(et.err, base.ErrCorruption))
default:
t.Fatalf("unknown wantErrType %T", tc.wantErrType)
}
})
}
}

// BenchmarkManySSTables measures the cost of various operations with various
// counts of SSTables within the database.
func BenchmarkManySSTables(b *testing.B) {
Expand Down
Loading

0 comments on commit 1b44d39

Please sign in to comment.