From 1b44d39fa5db816682e40c45c330346080374904 Mon Sep 17 00:00:00 2001 From: Nick Travers Date: Wed, 6 Oct 2021 13:55:08 -0700 Subject: [PATCH] db: schedule sstable validation on ingestion Currently, when Pebble ingests an sstable, it validates the block checksums necessary for it to retrieve the smallest and largest keys in the table. Wire up the block checksum validation codepath from #1240, scheduling the validation of the ingested sstable on a background goroutine. This ingestion validation is gated on a new experimental DB option, `ValidateOnIngest`, which is initially off by default. See #1203. --- db.go | 14 +++ event.go | 34 +++++ ingest.go | 70 +++++++++++ ingest_test.go | 211 ++++++++++++++++++++++++++++++++ internal/metamorphic/options.go | 5 +- open.go | 1 + options.go | 9 ++ options_test.go | 1 + testdata/metrics | 2 +- 9 files changed, 344 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 46c0cbfba52..97b9cfef93b 100644 --- a/db.go +++ b/db.go @@ -376,6 +376,17 @@ type DB struct { // them. pending []manifest.NewFileEntry } + + tableValidation struct { + // cond is a condition variable used to signal the completion of a + // job to validate one or more sstables. + cond sync.Cond + // pending is a slice of metadata for sstables waiting to be + // validated. + pending []*fileMetadata + // validating is set to true when validation is running. + validating bool + } } // Normally equal to time.Now() but may be overridden in tests. @@ -955,6 +966,9 @@ func (d *DB) Close() error { for d.mu.tableStats.loading { d.mu.tableStats.cond.Wait() } + for d.mu.tableValidation.validating { + d.mu.tableValidation.cond.Wait() + } var err error if n := len(d.mu.compact.inProgress); n > 0 { diff --git a/event.go b/event.go index 36d48eea8e9..7c0d1c6fc94 100644 --- a/event.go +++ b/event.go @@ -311,6 +311,27 @@ func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID)) } +// TableValidatedInfo contains information on the result of a validation run +// on an sstable. +type TableValidatedInfo struct { + JobID int + Meta *fileMetadata + Err error +} + +func (i TableValidatedInfo) String() string { + return redact.StringWithoutMarkers(i) +} + +// SafeFormat implements redact.SafeFormatter. +func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) { + if i.Err != nil { + w.Printf("[JOB %d] validation for table %s failed: %s", redact.Safe(i.JobID), i.Meta, i.Err) + return + } + w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta) +} + // WALCreateInfo contains info about a WAL creation event. type WALCreateInfo struct { // JobID is the ID of the job the caused the WAL to be created. @@ -435,6 +456,9 @@ type EventListener struct { // collector has loaded statistics for all tables that existed at Open. TableStatsLoaded func(TableStatsInfo) + // TableValidated is invoked after validation runs on an sstable. + TableValidated func(TableValidatedInfo) + // WALCreated is invoked after a WAL has been created. WALCreated func(WALCreateInfo) @@ -498,6 +522,9 @@ func (l *EventListener) EnsureDefaults(logger Logger) { if l.TableStatsLoaded == nil { l.TableStatsLoaded = func(info TableStatsInfo) {} } + if l.TableValidated == nil { + l.TableValidated = func(validated TableValidatedInfo) {} + } if l.WALCreated == nil { l.WALCreated = func(info WALCreateInfo) {} } @@ -559,6 +586,9 @@ func MakeLoggingEventListener(logger Logger) EventListener { TableStatsLoaded: func(info TableStatsInfo) { logger.Infof("%s", info) }, + TableValidated: func(info TableValidatedInfo) { + logger.Infof("%s", info) + }, WALCreated: func(info WALCreateInfo) { logger.Infof("%s", info) }, @@ -631,6 +661,10 @@ func TeeEventListener(a, b EventListener) EventListener { a.TableStatsLoaded(info) b.TableStatsLoaded(info) }, + TableValidated: func(info TableValidatedInfo) { + a.TableValidated(info) + b.TableValidated(info) + }, WALCreated: func(info WALCreateInfo) { a.WALCreated(info) b.WALCreated(info) diff --git a/ingest.go b/ingest.go index 4b897bcdd03..06bd398a730 100644 --- a/ingest.go +++ b/ingest.go @@ -691,5 +691,75 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) // The ingestion may have pushed a level over the threshold for compaction, // so check to see if one is necessary and schedule it. d.maybeScheduleCompaction() + d.maybeValidateSSTables(meta) return ve, nil } + +// maybeValidateSSTables adds the slice of fileMetadata to the pending queue of +// files to be validated, when the feature is enabled. +// DB.mu must be locked when calling. +func (d *DB) maybeValidateSSTables(meta []*fileMetadata) { + // Only add to the validation queue when the feature is enabled. + if !d.opts.Experimental.ValidateOnIngest { + return + } + + d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, meta...) + if d.shouldValidateSSTables() { + go d.validateSSTables() + } +} + +// shouldValidateSSTables returns true if SSTable validation should run. +// DB.mu must be locked when calling. +func (d *DB) shouldValidateSSTables() bool { + ok := !d.mu.tableValidation.validating + ok = ok && d.closed.Load() == nil + ok = ok && d.opts.Experimental.ValidateOnIngest + ok = ok && len(d.mu.tableValidation.pending) > 0 + return ok +} + +// validateSSTables runs a round of validation on the tables in the pending +// queue. +func (d *DB) validateSSTables() { + d.mu.Lock() + if !d.shouldValidateSSTables() { + d.mu.Unlock() + return + } + + pending := d.mu.tableValidation.pending + d.mu.tableValidation.pending = nil + d.mu.tableValidation.validating = true + jobID := d.mu.nextJobID + d.mu.nextJobID++ + + // Drop DB.mu before performing IO. + d.mu.Unlock() + + // Validate all tables in the pending queue. This could lead to a situation + // where we are starving IO from other tasks due to having to page through + // all the blocks in all the sstables in the queue. + // TODO(travers): Add some form of pacing to avoid IO starvation. + for _, meta := range pending { + err := d.tableCache.withReader(meta, func(r *sstable.Reader) error { + return r.ValidateBlockChecksums() + }) + // TODO(travers): Hook into the corruption reporting pipeline, once + // available. See pebble#1192. + d.opts.EventListener.TableValidated(TableValidatedInfo{ + JobID: jobID, + Meta: meta, + Err: err, + }) + } + + d.mu.Lock() + defer d.mu.Unlock() + d.mu.tableValidation.validating = false + d.mu.tableValidation.cond.Broadcast() + if d.shouldValidateSSTables() { + go d.validateSSTables() + } +} diff --git a/ingest_test.go b/ingest_test.go index 383a3e49104..e27fa0170a8 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1207,6 +1207,217 @@ func TestIngestCleanup(t *testing.T) { } } +func TestIngestValidation(t *testing.T) { + type keyVal struct { + key, val []byte + } + type corruptionLocation int + const ( + corruptionLocationNone corruptionLocation = iota + corruptionLocationStart + corruptionLocationEnd + corruptionLocationInternal + ) + type errLocation int + const ( + errLocationNone errLocation = iota + errLocationIngest + errLocationValidation + ) + const ( + nKeys = 1_000 + keySize = 10 + valSize = 100 + blockSize = 100 + + ingestTableName = "ext" + ) + + seed := uint64(time.Now().UnixNano()) + rng := rand.New(rand.NewSource(seed)) + t.Logf("rng seed = %d", seed) + + testCases := []struct { + description string + cLoc corruptionLocation + wantErrType errLocation + }{ + { + description: "no corruption", + cLoc: corruptionLocationNone, + wantErrType: errLocationNone, + }, + { + description: "start block", + cLoc: corruptionLocationStart, + wantErrType: errLocationIngest, + }, + { + description: "end block", + cLoc: corruptionLocationEnd, + wantErrType: errLocationIngest, + }, + { + description: "non-end block", + cLoc: corruptionLocationInternal, + wantErrType: errLocationValidation, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + var ( + info *TableValidatedInfo + wg sync.WaitGroup + ) + wg.Add(1) + + fs := vfs.NewMem() + opts := &Options{ + FS: fs, + EventListener: EventListener{ + TableValidated: func(i TableValidatedInfo) { + info = &i + wg.Done() + }, + }, + } + opts.Experimental.ValidateOnIngest = true + d, err := Open("", opts) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + + corrupt := func(f vfs.File) { + // Compute the layout of the sstable in order to find the + // appropriate block locations to corrupt. + r, err := sstable.NewReader(f, sstable.ReaderOptions{}) + require.NoError(t, err) + l, err := r.Layout() + require.NoError(t, err) + require.NoError(t, r.Close()) + + // Select an appropriate data block to corrupt. + var blockIdx int + switch tc.cLoc { + case corruptionLocationStart: + blockIdx = 0 + case corruptionLocationEnd: + blockIdx = len(l.Data) - 1 + case corruptionLocationInternal: + blockIdx = 1 + rng.Intn(len(l.Data)-2) + default: + t.Fatalf("unknown corruptionLocation: %T", tc.cLoc) + } + bh := l.Data[blockIdx] + + osF, err := os.OpenFile(ingestTableName, os.O_RDWR, 0600) + require.NoError(t, err) + defer func() { require.NoError(t, osF.Close()) }() + + // Corrupting a key will cause the ingestion to fail due to a + // malformed key, rather than a block checksum mismatch. + // Instead, we corrupt the last byte in the selected block, + // before the trailer, which corresponds to a value. + offset := bh.Offset + bh.Length - 1 + _, err = osF.WriteAt([]byte("\xff"), int64(offset)) + require.NoError(t, err) + } + + type errT struct { + errLoc errLocation + err error + } + runIngest := func(keyVals []keyVal) (et errT) { + // The vfs.File does not allow for random reads and writes. + // Create a disk-backed file outside of the DB FS that we can + // open as a regular os.File, if required. + tmpFS := vfs.Default + f, err := tmpFS.Create(ingestTableName) + require.NoError(t, err) + defer func() { _ = tmpFS.Remove(ingestTableName) }() + + w := sstable.NewWriter(f, sstable.WriterOptions{ + BlockSize: blockSize, // Create many smaller blocks. + Compression: NoCompression, // For simpler debugging. + }) + for _, kv := range keyVals { + require.NoError(t, w.Set(kv.key, kv.val)) + } + require.NoError(t, w.Close()) + + // Possibly corrupt the file. + if tc.cLoc != corruptionLocationNone { + f, err = tmpFS.Open(ingestTableName) + require.NoError(t, err) + corrupt(f) + } + + // Copy the file into the DB's FS. + _, err = vfs.Clone(tmpFS, fs, ingestTableName, ingestTableName) + require.NoError(t, err) + + // Ingest the external table. + err = d.Ingest([]string{ingestTableName}) + if err != nil { + et.errLoc = errLocationIngest + et.err = err + return + } + + // Wait for the validation on the sstable to complete. + wg.Wait() + + // Return any error encountered during validation. + et.err = info.Err + if et.err != nil { + et.errLoc = errLocationValidation + } + + return + } + + // Construct a set of keys to ingest. + var keyVals []keyVal + for i := 0; i < nKeys; i++ { + key := make([]byte, 0, keySize) + _, err = rng.Read(key) + require.NoError(t, err) + + val := make([]byte, 0, valSize) + _, err = rng.Read(val) + require.NoError(t, err) + + keyVals = append(keyVals, keyVal{key, val}) + } + + // Keys must be sorted. + sort.Slice(keyVals, func(i, j int) bool { + return d.cmp(keyVals[i].key, keyVals[j].key) <= 0 + }) + + // Run the ingestion. + et := runIngest(keyVals) + + // Assert we saw the errors we expect. + switch tc.wantErrType { + case errLocationNone: + require.Equal(t, errLocationNone, et.errLoc) + require.NoError(t, et.err) + case errLocationIngest: + require.Equal(t, errLocationIngest, et.errLoc) + require.Error(t, et.err) + require.True(t, errors.Is(et.err, base.ErrCorruption)) + case errLocationValidation: + require.Equal(t, errLocationValidation, et.errLoc) + require.Error(t, et.err) + require.True(t, errors.Is(et.err, base.ErrCorruption)) + default: + t.Fatalf("unknown wantErrType %T", tc.wantErrType) + } + }) + } +} + // BenchmarkManySSTables measures the cost of various operations with various // counts of SSTables within the database. func BenchmarkManySSTables(b *testing.B) { diff --git a/internal/metamorphic/options.go b/internal/metamorphic/options.go index 523d2132502..a25af7005d5 100644 --- a/internal/metamorphic/options.go +++ b/internal/metamorphic/options.go @@ -219,8 +219,9 @@ func randomOptions(rng *rand.Rand) *testOptions { } opts.Experimental.L0CompactionConcurrency = 1 + rng.Intn(4) // 1-4 opts.Experimental.MinDeletionRate = 1 << uint(20+rng.Intn(10)) // 1MB - 1GB - opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100 - opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100 + opts.Experimental.ValidateOnIngest = rng.Intn(2) != 0 + opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100 + opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100 if opts.L0StopWritesThreshold < opts.L0CompactionThreshold { opts.L0StopWritesThreshold = opts.L0CompactionThreshold } diff --git a/open.go b/open.go index 43ab871b840..6461e8ffbd5 100644 --- a/open.go +++ b/open.go @@ -440,6 +440,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { d.mu.versions.metrics.WAL.Files = int64(len(logFiles)) } d.mu.tableStats.cond.L = &d.mu.Mutex + d.mu.tableValidation.cond.L = &d.mu.Mutex if !d.opts.ReadOnly && !d.opts.private.disableTableStats { d.maybeCollectTableStats() } diff --git a/options.go b/options.go index 1c66f61e2d5..383d4d510fa 100644 --- a/options.go +++ b/options.go @@ -363,6 +363,12 @@ type Options struct { // // NOTE: callers should take care to not mutate the key being validated. KeyValidationFunc func(userKey []byte) error + + // ValidateOnIngest schedules validation of sstables after they have + // been ingested. + // + // By default, this value is false. + ValidateOnIngest bool } // Filters is a map from filter policy name to filter policy. It is used for @@ -767,6 +773,7 @@ func (o *Options) String() string { fmt.Fprintf(&buf, "%s", o.TablePropertyCollectors[i]().Name()) } fmt.Fprintf(&buf, "]\n") + fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest) fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir) fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync) @@ -979,6 +986,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error { } case "table_property_collectors": // TODO(peter): set o.TablePropertyCollectors + case "validate_on_ingest": + o.Experimental.ValidateOnIngest, err = strconv.ParseBool(value) case "wal_dir": o.WALDir = value case "wal_bytes_per_sync": diff --git a/options_test.go b/options_test.go index 8ee544992b9..c2baa5134b9 100644 --- a/options_test.go +++ b/options_test.go @@ -96,6 +96,7 @@ func TestOptionsString(t *testing.T) { strict_wal_tail=true table_cache_shards=8 table_property_collectors=[] + validate_on_ingest=false wal_dir= wal_bytes_per_sync=0 diff --git a/testdata/metrics b/testdata/metrics index bed3675cdbd..46db796e95c 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -148,7 +148,7 @@ zmemtbl 1 256 K disk-usage ---- -2.7 K +2.8 K # Closing iter b will release the last zombie sstable and the last zombie memtable.