diff --git a/sstable/reader.go b/sstable/reader.go index c6fdafcca72..e3185232cb6 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -2150,6 +2150,47 @@ func (r *Reader) Layout() (*Layout, error) { return l, nil } +// ValidateBlockChecksums validates the checksums for each block in the SSTable. +func (r *Reader) ValidateBlockChecksums() error { + // Pre-compute the BlockHandles for the underlying file. + l, err := r.Layout() + if err != nil { + return err + } + + // Construct the set of blocks to check. Note that the footer is not checked + // as it is not a block with a checksum. + var blocks []BlockHandle + blocks = append(blocks, l.Data...) + blocks = append(blocks, l.Index...) + blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.Properties, l.MetaIndex) + + // Sorting by offset ensures we are performing a sequential scan of the + // file. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Offset < blocks[j].Offset + }) + + // Check all blocks sequentially. Make use of read-ahead, given we are + // scanning the entire file from start to end. + blockRS := &readaheadState{} + for _, bh := range blocks { + // Certain blocks may not be present, in which case we skip them. + if bh.Length == 0 { + continue + } + + // Read the block, which validates the checksum. + h, err := r.readBlock(bh, nil /* transform */, blockRS) + if err != nil { + return err + } + h.Release() + } + + return nil +} + // EstimateDiskUsage returns the total size of data blocks overlapping the range // `[start, end]`. Even if a data block partially overlaps, or we cannot determine // overlap due to abbreviated index keys, the full data block size is included in @@ -2350,6 +2391,10 @@ func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Rea // Layout describes the block organization of an sstable. type Layout struct { + // NOTE: changes to fields in this struct should also be reflected in + // ValidateBlockChecksums, which validates a static list of BlockHandles + // referenced in this struct. + Data []BlockHandle Index []BlockHandle TopIndex BlockHandle diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 703d440903a..a3aa8595dd5 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -8,9 +8,11 @@ import ( "bytes" "encoding/binary" "fmt" + "io" "io/ioutil" "math" "os" + "path" "path/filepath" "strconv" "strings" @@ -614,6 +616,201 @@ func TestReaderChecksumErrors(t *testing.T) { } } +func TestValidateBlockChecksums(t *testing.T) { + seed := uint64(time.Now().UnixNano()) + rng := rand.New(rand.NewSource(seed)) + t.Logf("using seed = %d", seed) + + allFiles := []string{ + "testdata/h.no-compression.sst", + "testdata/h.no-compression.two_level_index.sst", + "testdata/h.sst", + "testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst", + "testdata/h.table-bloom.no-compression.sst", + "testdata/h.table-bloom.sst", + "testdata/h.zstd-compression.sst", + } + + type corruptionLocation int + const ( + corruptionLocationData corruptionLocation = iota + corruptionLocationIndex + corruptionLocationTopIndex + corruptionLocationFilter + corruptionLocationRangeDel + corruptionLocationProperties + corruptionLocationMetaIndex + ) + + testCases := []struct { + name string + files []string + corruptionLocations []corruptionLocation + }{ + { + name: "no corruption", + corruptionLocations: []corruptionLocation{}, + }, + { + name: "data block corruption", + corruptionLocations: []corruptionLocation{ + corruptionLocationData, + }, + }, + { + name: "index block corruption", + corruptionLocations: []corruptionLocation{ + corruptionLocationIndex, + }, + }, + { + name: "top index block corruption", + files: []string{ + "testdata/h.no-compression.two_level_index.sst", + }, + corruptionLocations: []corruptionLocation{ + corruptionLocationTopIndex, + }, + }, + { + name: "filter block corruption", + files: []string{ + "testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst", + "testdata/h.table-bloom.no-compression.sst", + "testdata/h.table-bloom.sst", + }, + corruptionLocations: []corruptionLocation{ + corruptionLocationFilter, + }, + }, + { + name: "range deletion block corruption", + corruptionLocations: []corruptionLocation{ + corruptionLocationRangeDel, + }, + }, + { + name: "properties block corruption", + corruptionLocations: []corruptionLocation{ + corruptionLocationProperties, + }, + }, + { + name: "metaindex block corruption", + corruptionLocations: []corruptionLocation{ + corruptionLocationMetaIndex, + }, + }, + { + name: "multiple blocks corrupted", + corruptionLocations: []corruptionLocation{ + corruptionLocationData, + corruptionLocationIndex, + corruptionLocationRangeDel, + corruptionLocationProperties, + corruptionLocationMetaIndex, + }, + }, + } + + testFn := func(t *testing.T, file string, corruptionLocations []corruptionLocation) { + // Create a copy of the SSTable that we can freely corrupt. + f, err := os.Open(filepath.FromSlash(file)) + require.NoError(t, err) + + pathCopy := path.Join(t.TempDir(), path.Base(file)) + fCopy, err := os.OpenFile(pathCopy, os.O_CREATE|os.O_RDWR, 0600) + require.NoError(t, err) + defer fCopy.Close() + + _, err = io.Copy(fCopy, f) + require.NoError(t, err) + err = fCopy.Sync() + require.NoError(t, err) + require.NoError(t, f.Close()) + + filter := bloom.FilterPolicy(10) + r, err := NewReader(fCopy, ReaderOptions{ + Filters: map[string]FilterPolicy{ + filter.Name(): filter, + }, + }) + require.NoError(t, err) + defer func() { require.NoError(t, r.Close()) }() + + // Prior to corruption, validation is successful. + require.NoError(t, r.ValidateBlockChecksums()) + + // If we are not testing for corruption, we can stop here. + if len(corruptionLocations) == 0 { + return + } + + // Perform bit flips in various corruption locations. + layout, err := r.Layout() + require.NoError(t, err) + for _, location := range corruptionLocations { + var bh BlockHandle + switch location { + case corruptionLocationData: + bh = layout.Data[rng.Intn(len(layout.Data))] + case corruptionLocationIndex: + bh = layout.Index[rng.Intn(len(layout.Index))] + case corruptionLocationTopIndex: + bh = layout.TopIndex + case corruptionLocationFilter: + bh = layout.Filter + case corruptionLocationRangeDel: + bh = layout.RangeDel + case corruptionLocationProperties: + bh = layout.Properties + case corruptionLocationMetaIndex: + bh = layout.MetaIndex + default: + t.Fatalf("unknown location") + } + + // Corrupt a random byte within the selected block. + pos := int64(bh.Offset) + rng.Int63n(int64(bh.Length)) + t.Logf("altering file=%s @ offset = %d", file, pos) + + b := make([]byte, 1) + n, err := fCopy.ReadAt(b, pos) + require.NoError(t, err) + require.Equal(t, 1, n) + t.Logf("data (before) = %08b", b) + + b[0] ^= 0xff + t.Logf("data (after) = %08b", b) + + _, err = fCopy.WriteAt(b, pos) + require.NoError(t, err) + } + + // Write back to the file. + err = fCopy.Sync() + require.NoError(t, err) + + // Confirm that checksum validation fails. + err = r.ValidateBlockChecksums() + require.Error(t, err) + require.Regexp(t, `checksum mismatch`, err.Error()) + } + + for _, tc := range testCases { + // By default, test across all files, unless overridden. + files := tc.files + if files == nil { + files = allFiles + } + for _, file := range files { + t.Run(tc.name+" "+path.Base(file), func(t *testing.T) { + testFn(t, file, tc.corruptionLocations) + }) + } + } +} + func buildTestTable( t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression, ) *Reader {