From 0ee4290cea22520802a3528ba03768c941d09468 Mon Sep 17 00:00:00 2001 From: Nick Travers Date: Wed, 6 Oct 2021 13:55:08 -0700 Subject: [PATCH] db: schedule sstable validation on ingestion Currently, when Pebble ingests an sstable, it validates the block checksums necessary for it to retrieve the smallest and largest keys in the table. Wire up the block checksum validation codepath from #1240, scheduling the validation of the ingested sstable on a background goroutine. This ingestion validation is gated on a new experimental DB option, `ValidateOnIngest`, which is initially off by default. See #1203. --- db.go | 14 ++ event.go | 29 +++++ ingest.go | 94 ++++++++++++++ ingest_test.go | 220 ++++++++++++++++++++++++++++++++ internal/metamorphic/options.go | 5 +- open.go | 1 + options.go | 9 ++ options_test.go | 1 + testdata/metrics | 2 +- 9 files changed, 372 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 70c4077fe5..20d491d986 100644 --- a/db.go +++ b/db.go @@ -376,6 +376,17 @@ type DB struct { // them. pending []manifest.NewFileEntry } + + tableValidation struct { + // cond is a condition variable used to signal the completion of a + // job to validate one or more sstables. + cond sync.Cond + // pending is a slice of metadata for sstables waiting to be + // validated. + pending []newFileEntry + // validating is set to true when validation is running. + validating bool + } } // Normally equal to time.Now() but may be overridden in tests. @@ -955,6 +966,9 @@ func (d *DB) Close() error { for d.mu.tableStats.loading { d.mu.tableStats.cond.Wait() } + for d.mu.tableValidation.validating { + d.mu.tableValidation.cond.Wait() + } var err error if n := len(d.mu.compact.inProgress); n > 0 { diff --git a/event.go b/event.go index 36d48eea8e..09cd780fc4 100644 --- a/event.go +++ b/event.go @@ -311,6 +311,22 @@ func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID)) } +// TableValidatedInfo contains information on the result of a validation run +// on an sstable. +type TableValidatedInfo struct { + JobID int + Meta *fileMetadata +} + +func (i TableValidatedInfo) String() string { + return redact.StringWithoutMarkers(i) +} + +// SafeFormat implements redact.SafeFormatter. +func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta) +} + // WALCreateInfo contains info about a WAL creation event. type WALCreateInfo struct { // JobID is the ID of the job the caused the WAL to be created. @@ -435,6 +451,9 @@ type EventListener struct { // collector has loaded statistics for all tables that existed at Open. TableStatsLoaded func(TableStatsInfo) + // TableValidated is invoked after validation runs on an sstable. + TableValidated func(TableValidatedInfo) + // WALCreated is invoked after a WAL has been created. WALCreated func(WALCreateInfo) @@ -498,6 +517,9 @@ func (l *EventListener) EnsureDefaults(logger Logger) { if l.TableStatsLoaded == nil { l.TableStatsLoaded = func(info TableStatsInfo) {} } + if l.TableValidated == nil { + l.TableValidated = func(validated TableValidatedInfo) {} + } if l.WALCreated == nil { l.WALCreated = func(info WALCreateInfo) {} } @@ -559,6 +581,9 @@ func MakeLoggingEventListener(logger Logger) EventListener { TableStatsLoaded: func(info TableStatsInfo) { logger.Infof("%s", info) }, + TableValidated: func(info TableValidatedInfo) { + logger.Infof("%s", info) + }, WALCreated: func(info WALCreateInfo) { logger.Infof("%s", info) }, @@ -631,6 +656,10 @@ func TeeEventListener(a, b EventListener) EventListener { a.TableStatsLoaded(info) b.TableStatsLoaded(info) }, + TableValidated: func(info TableValidatedInfo) { + a.TableValidated(info) + b.TableValidated(info) + }, WALCreated: func(info WALCreateInfo) { a.WALCreated(info) b.WALCreated(info) diff --git a/ingest.go b/ingest.go index 4b897bcdd0..2ed60cb5b1 100644 --- a/ingest.go +++ b/ingest.go @@ -691,5 +691,99 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) // The ingestion may have pushed a level over the threshold for compaction, // so check to see if one is necessary and schedule it. d.maybeScheduleCompaction() + d.maybeValidateSSTablesLocked(ve.NewFiles) return ve, nil } + +// maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending +// queue of files to be validated, when the feature is enabled. +// DB.mu must be locked when calling. +func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) { + // Only add to the validation queue when the feature is enabled. + if !d.opts.Experimental.ValidateOnIngest { + return + } + + d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...) + if d.shouldValidateSSTablesLocked() { + go d.validateSSTables() + } +} + +// shouldValidateSSTablesLocked returns true if SSTable validation should run. +// DB.mu must be locked when calling. +func (d *DB) shouldValidateSSTablesLocked() bool { + return !d.mu.tableValidation.validating && + d.closed.Load() == nil && + d.opts.Experimental.ValidateOnIngest && + len(d.mu.tableValidation.pending) > 0 +} + +// validateSSTables runs a round of validation on the tables in the pending +// queue. +func (d *DB) validateSSTables() { + d.mu.Lock() + if !d.shouldValidateSSTablesLocked() { + d.mu.Unlock() + return + } + + pending := d.mu.tableValidation.pending + d.mu.tableValidation.pending = nil + d.mu.tableValidation.validating = true + jobID := d.mu.nextJobID + d.mu.nextJobID++ + rs := d.loadReadState() + + // Drop DB.mu before performing IO. + d.mu.Unlock() + + // Validate all tables in the pending queue. This could lead to a situation + // where we are starving IO from other tasks due to having to page through + // all the blocks in all the sstables in the queue. + // TODO(travers): Add some form of pacing to avoid IO starvation. + for _, f := range pending { + // The file may have been moved or deleted since it was ingested, in + // which case we skip. + if !rs.current.Contains(f.Level, d.cmp, f.Meta) { + // Assume the file was moved to a lower level. It is rare enough + // that a table is moved or deleted between the time it was ingested + // and the time the validation routine runs that the overall cost of + // this inner loop is tolerably low, when amortized over all + // ingested tables. + found := false + for i := f.Level + 1; i < numLevels; i++ { + if rs.current.Contains(i, d.cmp, f.Meta) { + found = true + break + } + } + if !found { + continue + } + } + + err := d.tableCache.withReader(f.Meta, func(r *sstable.Reader) error { + return r.ValidateBlockChecksums() + }) + if err != nil { + // TODO(travers): Hook into the corruption reporting pipeline, once + // available. See pebble#1192. + d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err) + } + + d.opts.EventListener.TableValidated(TableValidatedInfo{ + JobID: jobID, + Meta: f.Meta, + }) + } + rs.unref() + + d.mu.Lock() + defer d.mu.Unlock() + d.mu.tableValidation.validating = false + d.mu.tableValidation.cond.Broadcast() + if d.shouldValidateSSTablesLocked() { + go d.validateSSTables() + } +} diff --git a/ingest_test.go b/ingest_test.go index 383a3e4910..34f2e54e1b 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1207,6 +1207,226 @@ func TestIngestCleanup(t *testing.T) { } } +// fatalCapturingLogger captures a fatal error instead of panicking. +type fatalCapturingLogger struct { + defaultLogger + err error +} + +// Fatalf implements the Logger.Fatalf interface. +func (l *fatalCapturingLogger) Fatalf(_ string, args ...interface{}) { + l.err = args[0].(error) +} + +func TestIngestValidation(t *testing.T) { + type keyVal struct { + key, val []byte + } + type corruptionLocation int + const ( + corruptionLocationNone corruptionLocation = iota + corruptionLocationStart + corruptionLocationEnd + corruptionLocationInternal + ) + type errLocation int + const ( + errLocationNone errLocation = iota + errLocationIngest + errLocationValidation + ) + const ( + nKeys = 1_000 + keySize = 10 + valSize = 100 + blockSize = 100 + + ingestTableName = "ext" + ) + + seed := uint64(time.Now().UnixNano()) + rng := rand.New(rand.NewSource(seed)) + t.Logf("rng seed = %d", seed) + + testCases := []struct { + description string + cLoc corruptionLocation + wantErrType errLocation + }{ + { + description: "no corruption", + cLoc: corruptionLocationNone, + wantErrType: errLocationNone, + }, + { + description: "start block", + cLoc: corruptionLocationStart, + wantErrType: errLocationIngest, + }, + { + description: "end block", + cLoc: corruptionLocationEnd, + wantErrType: errLocationIngest, + }, + { + description: "non-end block", + cLoc: corruptionLocationInternal, + wantErrType: errLocationValidation, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + + fs := vfs.NewMem() + logger := &fatalCapturingLogger{} + opts := &Options{ + FS: fs, + Logger: logger, + EventListener: EventListener{ + TableValidated: func(i TableValidatedInfo) { + wg.Done() + }, + }, + } + opts.Experimental.ValidateOnIngest = true + d, err := Open("", opts) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + + corrupt := func(f vfs.File) { + // Compute the layout of the sstable in order to find the + // appropriate block locations to corrupt. + r, err := sstable.NewReader(f, sstable.ReaderOptions{}) + require.NoError(t, err) + l, err := r.Layout() + require.NoError(t, err) + require.NoError(t, r.Close()) + + // Select an appropriate data block to corrupt. + var blockIdx int + switch tc.cLoc { + case corruptionLocationStart: + blockIdx = 0 + case corruptionLocationEnd: + blockIdx = len(l.Data) - 1 + case corruptionLocationInternal: + blockIdx = 1 + rng.Intn(len(l.Data)-2) + default: + t.Fatalf("unknown corruptionLocation: %T", tc.cLoc) + } + bh := l.Data[blockIdx] + + osF, err := os.OpenFile(ingestTableName, os.O_RDWR, 0600) + require.NoError(t, err) + defer func() { require.NoError(t, osF.Close()) }() + + // Corrupting a key will cause the ingestion to fail due to a + // malformed key, rather than a block checksum mismatch. + // Instead, we corrupt the last byte in the selected block, + // before the trailer, which corresponds to a value. + offset := bh.Offset + bh.Length - 1 + _, err = osF.WriteAt([]byte("\xff"), int64(offset)) + require.NoError(t, err) + } + + type errT struct { + errLoc errLocation + err error + } + runIngest := func(keyVals []keyVal) (et errT) { + // The vfs.File does not allow for random reads and writes. + // Create a disk-backed file outside of the DB FS that we can + // open as a regular os.File, if required. + tmpFS := vfs.Default + f, err := tmpFS.Create(ingestTableName) + require.NoError(t, err) + defer func() { _ = tmpFS.Remove(ingestTableName) }() + + w := sstable.NewWriter(f, sstable.WriterOptions{ + BlockSize: blockSize, // Create many smaller blocks. + Compression: NoCompression, // For simpler debugging. + }) + for _, kv := range keyVals { + require.NoError(t, w.Set(kv.key, kv.val)) + } + require.NoError(t, w.Close()) + + // Possibly corrupt the file. + if tc.cLoc != corruptionLocationNone { + f, err = tmpFS.Open(ingestTableName) + require.NoError(t, err) + corrupt(f) + } + + // Copy the file into the DB's FS. + _, err = vfs.Clone(tmpFS, fs, ingestTableName, ingestTableName) + require.NoError(t, err) + + // Ingest the external table. + err = d.Ingest([]string{ingestTableName}) + if err != nil { + et.errLoc = errLocationIngest + et.err = err + return + } + + // Wait for the validation on the sstable to complete. + wg.Wait() + + // Return any error encountered during validation. + if logger.err != nil { + et.errLoc = errLocationValidation + et.err = logger.err + } + + return + } + + // Construct a set of keys to ingest. + var keyVals []keyVal + for i := 0; i < nKeys; i++ { + key := make([]byte, 0, keySize) + _, err = rng.Read(key) + require.NoError(t, err) + + val := make([]byte, 0, valSize) + _, err = rng.Read(val) + require.NoError(t, err) + + keyVals = append(keyVals, keyVal{key, val}) + } + + // Keys must be sorted. + sort.Slice(keyVals, func(i, j int) bool { + return d.cmp(keyVals[i].key, keyVals[j].key) <= 0 + }) + + // Run the ingestion. + et := runIngest(keyVals) + + // Assert we saw the errors we expect. + switch tc.wantErrType { + case errLocationNone: + require.Equal(t, errLocationNone, et.errLoc) + require.NoError(t, et.err) + case errLocationIngest: + require.Equal(t, errLocationIngest, et.errLoc) + require.Error(t, et.err) + require.True(t, errors.Is(et.err, base.ErrCorruption)) + case errLocationValidation: + require.Equal(t, errLocationValidation, et.errLoc) + require.Error(t, et.err) + require.True(t, errors.Is(et.err, base.ErrCorruption)) + default: + t.Fatalf("unknown wantErrType %T", tc.wantErrType) + } + }) + } +} + // BenchmarkManySSTables measures the cost of various operations with various // counts of SSTables within the database. func BenchmarkManySSTables(b *testing.B) { diff --git a/internal/metamorphic/options.go b/internal/metamorphic/options.go index 523d213250..a25af7005d 100644 --- a/internal/metamorphic/options.go +++ b/internal/metamorphic/options.go @@ -219,8 +219,9 @@ func randomOptions(rng *rand.Rand) *testOptions { } opts.Experimental.L0CompactionConcurrency = 1 + rng.Intn(4) // 1-4 opts.Experimental.MinDeletionRate = 1 << uint(20+rng.Intn(10)) // 1MB - 1GB - opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100 - opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100 + opts.Experimental.ValidateOnIngest = rng.Intn(2) != 0 + opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100 + opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100 if opts.L0StopWritesThreshold < opts.L0CompactionThreshold { opts.L0StopWritesThreshold = opts.L0CompactionThreshold } diff --git a/open.go b/open.go index 484f98cef7..ea39bbc74d 100644 --- a/open.go +++ b/open.go @@ -440,6 +440,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { d.mu.versions.metrics.WAL.Files = int64(len(logFiles)) } d.mu.tableStats.cond.L = &d.mu.Mutex + d.mu.tableValidation.cond.L = &d.mu.Mutex if !d.opts.ReadOnly && !d.opts.private.disableTableStats { d.maybeCollectTableStatsLocked() } diff --git a/options.go b/options.go index 1c66f61e2d..383d4d510f 100644 --- a/options.go +++ b/options.go @@ -363,6 +363,12 @@ type Options struct { // // NOTE: callers should take care to not mutate the key being validated. KeyValidationFunc func(userKey []byte) error + + // ValidateOnIngest schedules validation of sstables after they have + // been ingested. + // + // By default, this value is false. + ValidateOnIngest bool } // Filters is a map from filter policy name to filter policy. It is used for @@ -767,6 +773,7 @@ func (o *Options) String() string { fmt.Fprintf(&buf, "%s", o.TablePropertyCollectors[i]().Name()) } fmt.Fprintf(&buf, "]\n") + fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest) fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir) fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync) @@ -979,6 +986,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error { } case "table_property_collectors": // TODO(peter): set o.TablePropertyCollectors + case "validate_on_ingest": + o.Experimental.ValidateOnIngest, err = strconv.ParseBool(value) case "wal_dir": o.WALDir = value case "wal_bytes_per_sync": diff --git a/options_test.go b/options_test.go index 8ee544992b..c2baa5134b 100644 --- a/options_test.go +++ b/options_test.go @@ -96,6 +96,7 @@ func TestOptionsString(t *testing.T) { strict_wal_tail=true table_cache_shards=8 table_property_collectors=[] + validate_on_ingest=false wal_dir= wal_bytes_per_sync=0 diff --git a/testdata/metrics b/testdata/metrics index bed3675cdb..46db796e95 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -148,7 +148,7 @@ zmemtbl 1 256 K disk-usage ---- -2.7 K +2.8 K # Closing iter b will release the last zombie sstable and the last zombie memtable.