diff --git a/db.go b/db.go index 46c0cbfba52..97b9cfef93b 100644 --- a/db.go +++ b/db.go @@ -376,6 +376,17 @@ type DB struct { // them. pending []manifest.NewFileEntry } + + tableValidation struct { + // cond is a condition variable used to signal the completion of a + // job to validate one or more sstables. + cond sync.Cond + // pending is a slice of metadata for sstables waiting to be + // validated. + pending []*fileMetadata + // validating is set to true when validation is running. + validating bool + } } // Normally equal to time.Now() but may be overridden in tests. @@ -955,6 +966,9 @@ func (d *DB) Close() error { for d.mu.tableStats.loading { d.mu.tableStats.cond.Wait() } + for d.mu.tableValidation.validating { + d.mu.tableValidation.cond.Wait() + } var err error if n := len(d.mu.compact.inProgress); n > 0 { diff --git a/event.go b/event.go index 36d48eea8e9..7c0d1c6fc94 100644 --- a/event.go +++ b/event.go @@ -311,6 +311,27 @@ func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID)) } +// TableValidatedInfo contains information on the result of a validation run +// on an sstable. +type TableValidatedInfo struct { + JobID int + Meta *fileMetadata + Err error +} + +func (i TableValidatedInfo) String() string { + return redact.StringWithoutMarkers(i) +} + +// SafeFormat implements redact.SafeFormatter. +func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) { + if i.Err != nil { + w.Printf("[JOB %d] validation for table %s failed: %s", redact.Safe(i.JobID), i.Meta, i.Err) + return + } + w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta) +} + // WALCreateInfo contains info about a WAL creation event. type WALCreateInfo struct { // JobID is the ID of the job the caused the WAL to be created. @@ -435,6 +456,9 @@ type EventListener struct { // collector has loaded statistics for all tables that existed at Open. TableStatsLoaded func(TableStatsInfo) + // TableValidated is invoked after validation runs on an sstable. + TableValidated func(TableValidatedInfo) + // WALCreated is invoked after a WAL has been created. WALCreated func(WALCreateInfo) @@ -498,6 +522,9 @@ func (l *EventListener) EnsureDefaults(logger Logger) { if l.TableStatsLoaded == nil { l.TableStatsLoaded = func(info TableStatsInfo) {} } + if l.TableValidated == nil { + l.TableValidated = func(validated TableValidatedInfo) {} + } if l.WALCreated == nil { l.WALCreated = func(info WALCreateInfo) {} } @@ -559,6 +586,9 @@ func MakeLoggingEventListener(logger Logger) EventListener { TableStatsLoaded: func(info TableStatsInfo) { logger.Infof("%s", info) }, + TableValidated: func(info TableValidatedInfo) { + logger.Infof("%s", info) + }, WALCreated: func(info WALCreateInfo) { logger.Infof("%s", info) }, @@ -631,6 +661,10 @@ func TeeEventListener(a, b EventListener) EventListener { a.TableStatsLoaded(info) b.TableStatsLoaded(info) }, + TableValidated: func(info TableValidatedInfo) { + a.TableValidated(info) + b.TableValidated(info) + }, WALCreated: func(info WALCreateInfo) { a.WALCreated(info) b.WALCreated(info) diff --git a/ingest.go b/ingest.go index 4b897bcdd03..06bd398a730 100644 --- a/ingest.go +++ b/ingest.go @@ -691,5 +691,75 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) // The ingestion may have pushed a level over the threshold for compaction, // so check to see if one is necessary and schedule it. d.maybeScheduleCompaction() + d.maybeValidateSSTables(meta) return ve, nil } + +// maybeValidateSSTables adds the slice of fileMetadata to the pending queue of +// files to be validated, when the feature is enabled. +// DB.mu must be locked when calling. +func (d *DB) maybeValidateSSTables(meta []*fileMetadata) { + // Only add to the validation queue when the feature is enabled. + if !d.opts.Experimental.ValidateOnIngest { + return + } + + d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, meta...) + if d.shouldValidateSSTables() { + go d.validateSSTables() + } +} + +// shouldValidateSSTables returns true if SSTable validation should run. +// DB.mu must be locked when calling. +func (d *DB) shouldValidateSSTables() bool { + ok := !d.mu.tableValidation.validating + ok = ok && d.closed.Load() == nil + ok = ok && d.opts.Experimental.ValidateOnIngest + ok = ok && len(d.mu.tableValidation.pending) > 0 + return ok +} + +// validateSSTables runs a round of validation on the tables in the pending +// queue. +func (d *DB) validateSSTables() { + d.mu.Lock() + if !d.shouldValidateSSTables() { + d.mu.Unlock() + return + } + + pending := d.mu.tableValidation.pending + d.mu.tableValidation.pending = nil + d.mu.tableValidation.validating = true + jobID := d.mu.nextJobID + d.mu.nextJobID++ + + // Drop DB.mu before performing IO. + d.mu.Unlock() + + // Validate all tables in the pending queue. This could lead to a situation + // where we are starving IO from other tasks due to having to page through + // all the blocks in all the sstables in the queue. + // TODO(travers): Add some form of pacing to avoid IO starvation. + for _, meta := range pending { + err := d.tableCache.withReader(meta, func(r *sstable.Reader) error { + return r.ValidateBlockChecksums() + }) + // TODO(travers): Hook into the corruption reporting pipeline, once + // available. See pebble#1192. + d.opts.EventListener.TableValidated(TableValidatedInfo{ + JobID: jobID, + Meta: meta, + Err: err, + }) + } + + d.mu.Lock() + defer d.mu.Unlock() + d.mu.tableValidation.validating = false + d.mu.tableValidation.cond.Broadcast() + if d.shouldValidateSSTables() { + go d.validateSSTables() + } +} diff --git a/ingest_test.go b/ingest_test.go index 383a3e49104..e27fa0170a8 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1207,6 +1207,217 @@ func TestIngestCleanup(t *testing.T) { } } +func TestIngestValidation(t *testing.T) { + type keyVal struct { + key, val []byte + } + type corruptionLocation int + const ( + corruptionLocationNone corruptionLocation = iota + corruptionLocationStart + corruptionLocationEnd + corruptionLocationInternal + ) + type errLocation int + const ( + errLocationNone errLocation = iota + errLocationIngest + errLocationValidation + ) + const ( + nKeys = 1_000 + keySize = 10 + valSize = 100 + blockSize = 100 + + ingestTableName = "ext" + ) + + seed := uint64(time.Now().UnixNano()) + rng := rand.New(rand.NewSource(seed)) + t.Logf("rng seed = %d", seed) + + testCases := []struct { + description string + cLoc corruptionLocation + wantErrType errLocation + }{ + { + description: "no corruption", + cLoc: corruptionLocationNone, + wantErrType: errLocationNone, + }, + { + description: "start block", + cLoc: corruptionLocationStart, + wantErrType: errLocationIngest, + }, + { + description: "end block", + cLoc: corruptionLocationEnd, + wantErrType: errLocationIngest, + }, + { + description: "non-end block", + cLoc: corruptionLocationInternal, + wantErrType: errLocationValidation, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + var ( + info *TableValidatedInfo + wg sync.WaitGroup + ) + wg.Add(1) + + fs := vfs.NewMem() + opts := &Options{ + FS: fs, + EventListener: EventListener{ + TableValidated: func(i TableValidatedInfo) { + info = &i + wg.Done() + }, + }, + } + opts.Experimental.ValidateOnIngest = true + d, err := Open("", opts) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + + corrupt := func(f vfs.File) { + // Compute the layout of the sstable in order to find the + // appropriate block locations to corrupt. + r, err := sstable.NewReader(f, sstable.ReaderOptions{}) + require.NoError(t, err) + l, err := r.Layout() + require.NoError(t, err) + require.NoError(t, r.Close()) + + // Select an appropriate data block to corrupt. + var blockIdx int + switch tc.cLoc { + case corruptionLocationStart: + blockIdx = 0 + case corruptionLocationEnd: + blockIdx = len(l.Data) - 1 + case corruptionLocationInternal: + blockIdx = 1 + rng.Intn(len(l.Data)-2) + default: + t.Fatalf("unknown corruptionLocation: %T", tc.cLoc) + } + bh := l.Data[blockIdx] + + osF, err := os.OpenFile(ingestTableName, os.O_RDWR, 0600) + require.NoError(t, err) + defer func() { require.NoError(t, osF.Close()) }() + + // Corrupting a key will cause the ingestion to fail due to a + // malformed key, rather than a block checksum mismatch. + // Instead, we corrupt the last byte in the selected block, + // before the trailer, which corresponds to a value. + offset := bh.Offset + bh.Length - 1 + _, err = osF.WriteAt([]byte("\xff"), int64(offset)) + require.NoError(t, err) + } + + type errT struct { + errLoc errLocation + err error + } + runIngest := func(keyVals []keyVal) (et errT) { + // The vfs.File does not allow for random reads and writes. + // Create a disk-backed file outside of the DB FS that we can + // open as a regular os.File, if required. + tmpFS := vfs.Default + f, err := tmpFS.Create(ingestTableName) + require.NoError(t, err) + defer func() { _ = tmpFS.Remove(ingestTableName) }() + + w := sstable.NewWriter(f, sstable.WriterOptions{ + BlockSize: blockSize, // Create many smaller blocks. + Compression: NoCompression, // For simpler debugging. + }) + for _, kv := range keyVals { + require.NoError(t, w.Set(kv.key, kv.val)) + } + require.NoError(t, w.Close()) + + // Possibly corrupt the file. + if tc.cLoc != corruptionLocationNone { + f, err = tmpFS.Open(ingestTableName) + require.NoError(t, err) + corrupt(f) + } + + // Copy the file into the DB's FS. + _, err = vfs.Clone(tmpFS, fs, ingestTableName, ingestTableName) + require.NoError(t, err) + + // Ingest the external table. + err = d.Ingest([]string{ingestTableName}) + if err != nil { + et.errLoc = errLocationIngest + et.err = err + return + } + + // Wait for the validation on the sstable to complete. + wg.Wait() + + // Return any error encountered during validation. + et.err = info.Err + if et.err != nil { + et.errLoc = errLocationValidation + } + + return + } + + // Construct a set of keys to ingest. + var keyVals []keyVal + for i := 0; i < nKeys; i++ { + key := make([]byte, 0, keySize) + _, err = rng.Read(key) + require.NoError(t, err) + + val := make([]byte, 0, valSize) + _, err = rng.Read(val) + require.NoError(t, err) + + keyVals = append(keyVals, keyVal{key, val}) + } + + // Keys must be sorted. + sort.Slice(keyVals, func(i, j int) bool { + return d.cmp(keyVals[i].key, keyVals[j].key) <= 0 + }) + + // Run the ingestion. + et := runIngest(keyVals) + + // Assert we saw the errors we expect. + switch tc.wantErrType { + case errLocationNone: + require.Equal(t, errLocationNone, et.errLoc) + require.NoError(t, et.err) + case errLocationIngest: + require.Equal(t, errLocationIngest, et.errLoc) + require.Error(t, et.err) + require.True(t, errors.Is(et.err, base.ErrCorruption)) + case errLocationValidation: + require.Equal(t, errLocationValidation, et.errLoc) + require.Error(t, et.err) + require.True(t, errors.Is(et.err, base.ErrCorruption)) + default: + t.Fatalf("unknown wantErrType %T", tc.wantErrType) + } + }) + } +} + // BenchmarkManySSTables measures the cost of various operations with various // counts of SSTables within the database. func BenchmarkManySSTables(b *testing.B) { diff --git a/internal/metamorphic/options.go b/internal/metamorphic/options.go index 523d2132502..a25af7005d5 100644 --- a/internal/metamorphic/options.go +++ b/internal/metamorphic/options.go @@ -219,8 +219,9 @@ func randomOptions(rng *rand.Rand) *testOptions { } opts.Experimental.L0CompactionConcurrency = 1 + rng.Intn(4) // 1-4 opts.Experimental.MinDeletionRate = 1 << uint(20+rng.Intn(10)) // 1MB - 1GB - opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100 - opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100 + opts.Experimental.ValidateOnIngest = rng.Intn(2) != 0 + opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100 + opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100 if opts.L0StopWritesThreshold < opts.L0CompactionThreshold { opts.L0StopWritesThreshold = opts.L0CompactionThreshold } diff --git a/open.go b/open.go index 43ab871b840..6461e8ffbd5 100644 --- a/open.go +++ b/open.go @@ -440,6 +440,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { d.mu.versions.metrics.WAL.Files = int64(len(logFiles)) } d.mu.tableStats.cond.L = &d.mu.Mutex + d.mu.tableValidation.cond.L = &d.mu.Mutex if !d.opts.ReadOnly && !d.opts.private.disableTableStats { d.maybeCollectTableStats() } diff --git a/options.go b/options.go index 1c66f61e2d5..383d4d510fa 100644 --- a/options.go +++ b/options.go @@ -363,6 +363,12 @@ type Options struct { // // NOTE: callers should take care to not mutate the key being validated. KeyValidationFunc func(userKey []byte) error + + // ValidateOnIngest schedules validation of sstables after they have + // been ingested. + // + // By default, this value is false. + ValidateOnIngest bool } // Filters is a map from filter policy name to filter policy. It is used for @@ -767,6 +773,7 @@ func (o *Options) String() string { fmt.Fprintf(&buf, "%s", o.TablePropertyCollectors[i]().Name()) } fmt.Fprintf(&buf, "]\n") + fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest) fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir) fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync) @@ -979,6 +986,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error { } case "table_property_collectors": // TODO(peter): set o.TablePropertyCollectors + case "validate_on_ingest": + o.Experimental.ValidateOnIngest, err = strconv.ParseBool(value) case "wal_dir": o.WALDir = value case "wal_bytes_per_sync": diff --git a/options_test.go b/options_test.go index 8ee544992b9..c2baa5134b9 100644 --- a/options_test.go +++ b/options_test.go @@ -96,6 +96,7 @@ func TestOptionsString(t *testing.T) { strict_wal_tail=true table_cache_shards=8 table_property_collectors=[] + validate_on_ingest=false wal_dir= wal_bytes_per_sync=0 diff --git a/testdata/metrics b/testdata/metrics index bed3675cdbd..46db796e95c 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -148,7 +148,7 @@ zmemtbl 1 256 K disk-usage ---- -2.7 K +2.8 K # Closing iter b will release the last zombie sstable and the last zombie memtable.