Skip to content

Commit

Permalink
sstable: support block checksum validation for entire SSTables
Browse files Browse the repository at this point in the history
To provide better detection of corruption upon ingestion of an SSTable,
add the ability for an `sstable.Reader` to validate the checksums of all
blocks in the underlying file for the SSTable.

The new `(*sstable.Reader).ValidateBlockChecksums` will be used by a
subsequent change to perform validation of an SSTable upon ingestion,
with any corruption that may be detected detected passed to the
corruption reporting and recovery mechanism outlined in
cockroachdb/cockroach#67568.

See also: cockroachdb#1203.
  • Loading branch information
nicktrav committed Aug 30, 2021
1 parent ff43a58 commit 08d7a93
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 0 deletions.
45 changes: 45 additions & 0 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,47 @@ func (r *Reader) Layout() (*Layout, error) {
return l, nil
}

// ValidateBlockChecksums validates the checksums for each block in the SSTable.
func (r *Reader) ValidateBlockChecksums() error {
// Pre-compute the BlockHandles for the underlying file.
l, err := r.Layout()
if err != nil {
return err
}

// Construct the set of blocks to check. Note that the footer is not checked
// as it is not a block with a checksum.
var blocks []BlockHandle
blocks = append(blocks, l.Data...)
blocks = append(blocks, l.Index...)
blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.Properties, l.MetaIndex)

// Sorting by offset ensures we are performing a sequential scan of the
// file.
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Offset < blocks[j].Offset
})

// Check all blocks sequentially. Make use of read-ahead, given we are
// scanning the entire file from start to end.
blockRS := &readaheadState{}
for _, bh := range blocks {
// Certain blocks may not be present, in which case we skip them.
if bh.Length == 0 {
continue
}

// Read the block, which validates the checksum.
h, err := r.readBlock(bh, nil /* transform */, blockRS)
if err != nil {
return err
}
h.Release()
}

return nil
}

// EstimateDiskUsage returns the total size of data blocks overlapping the range
// `[start, end]`. Even if a data block partially overlaps, or we cannot determine
// overlap due to abbreviated index keys, the full data block size is included in
Expand Down Expand Up @@ -2350,6 +2391,10 @@ func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Rea

// Layout describes the block organization of an sstable.
type Layout struct {
// NOTE: changes to fields in this struct should also be reflected in
// ValidateBlockChecksums, which validates a static list of BlockHandles
// referenced in this struct.

Data []BlockHandle
Index []BlockHandle
TopIndex BlockHandle
Expand Down
197 changes: 197 additions & 0 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -614,6 +616,201 @@ func TestReaderChecksumErrors(t *testing.T) {
}
}

func TestValidateBlockChecksums(t *testing.T) {
seed := uint64(time.Now().UnixNano())
rng := rand.New(rand.NewSource(seed))
t.Logf("using seed = %d", seed)

allFiles := []string{
"testdata/h.no-compression.sst",
"testdata/h.no-compression.two_level_index.sst",
"testdata/h.sst",
"testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst",
"testdata/h.table-bloom.no-compression.sst",
"testdata/h.table-bloom.sst",
"testdata/h.zstd-compression.sst",
}

type corruptionLocation int
const (
corruptionLocationData corruptionLocation = iota
corruptionLocationIndex
corruptionLocationTopIndex
corruptionLocationFilter
corruptionLocationRangeDel
corruptionLocationProperties
corruptionLocationMetaIndex
)

testCases := []struct {
name string
files []string
corruptionLocations []corruptionLocation
}{
{
name: "no corruption",
corruptionLocations: []corruptionLocation{},
},
{
name: "data block corruption",
corruptionLocations: []corruptionLocation{
corruptionLocationData,
},
},
{
name: "index block corruption",
corruptionLocations: []corruptionLocation{
corruptionLocationIndex,
},
},
{
name: "top index block corruption",
files: []string{
"testdata/h.no-compression.two_level_index.sst",
},
corruptionLocations: []corruptionLocation{
corruptionLocationTopIndex,
},
},
{
name: "filter block corruption",
files: []string{
"testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst",
"testdata/h.table-bloom.no-compression.sst",
"testdata/h.table-bloom.sst",
},
corruptionLocations: []corruptionLocation{
corruptionLocationFilter,
},
},
{
name: "range deletion block corruption",
corruptionLocations: []corruptionLocation{
corruptionLocationRangeDel,
},
},
{
name: "properties block corruption",
corruptionLocations: []corruptionLocation{
corruptionLocationProperties,
},
},
{
name: "metaindex block corruption",
corruptionLocations: []corruptionLocation{
corruptionLocationMetaIndex,
},
},
{
name: "multiple blocks corrupted",
corruptionLocations: []corruptionLocation{
corruptionLocationData,
corruptionLocationIndex,
corruptionLocationRangeDel,
corruptionLocationProperties,
corruptionLocationMetaIndex,
},
},
}

testFn := func(t *testing.T, file string, corruptionLocations []corruptionLocation) {
// Create a copy of the SSTable that we can freely corrupt.
f, err := os.Open(filepath.FromSlash(file))
require.NoError(t, err)

pathCopy := path.Join(t.TempDir(), path.Base(file))
fCopy, err := os.OpenFile(pathCopy, os.O_CREATE|os.O_RDWR, 0600)
require.NoError(t, err)
defer fCopy.Close()

_, err = io.Copy(fCopy, f)
require.NoError(t, err)
err = fCopy.Sync()
require.NoError(t, err)
require.NoError(t, f.Close())

filter := bloom.FilterPolicy(10)
r, err := NewReader(fCopy, ReaderOptions{
Filters: map[string]FilterPolicy{
filter.Name(): filter,
},
})
require.NoError(t, err)
defer func() { require.NoError(t, r.Close()) }()

// Prior to corruption, validation is successful.
require.NoError(t, r.ValidateBlockChecksums())

// If we are not testing for corruption, we can stop here.
if len(corruptionLocations) == 0 {
return
}

// Perform bit flips in various corruption locations.
layout, err := r.Layout()
require.NoError(t, err)
for _, location := range corruptionLocations {
var bh BlockHandle
switch location {
case corruptionLocationData:
bh = layout.Data[rng.Intn(len(layout.Data))]
case corruptionLocationIndex:
bh = layout.Index[rng.Intn(len(layout.Index))]
case corruptionLocationTopIndex:
bh = layout.TopIndex
case corruptionLocationFilter:
bh = layout.Filter
case corruptionLocationRangeDel:
bh = layout.RangeDel
case corruptionLocationProperties:
bh = layout.Properties
case corruptionLocationMetaIndex:
bh = layout.MetaIndex
default:
t.Fatalf("unknown location")
}

// Corrupt a random byte within the selected block.
pos := int64(bh.Offset) + rng.Int63n(int64(bh.Length))
t.Logf("altering file=%s @ offset = %d", file, pos)

b := make([]byte, 1)
n, err := fCopy.ReadAt(b, pos)
require.NoError(t, err)
require.Equal(t, 1, n)
t.Logf("data (before) = %08b", b)

b[0] ^= 0xff
t.Logf("data (after) = %08b", b)

_, err = fCopy.WriteAt(b, pos)
require.NoError(t, err)
}

// Write back to the file.
err = fCopy.Sync()
require.NoError(t, err)

// Confirm that checksum validation fails.
err = r.ValidateBlockChecksums()
require.Error(t, err)
require.Regexp(t, `checksum mismatch`, err.Error())
}

for _, tc := range testCases {
// By default, test across all files, unless overridden.
files := tc.files
if files == nil {
files = allFiles
}
for _, file := range files {
t.Run(tc.name+" "+path.Base(file), func(t *testing.T) {
testFn(t, file, tc.corruptionLocations)
})
}
}
}

func buildTestTable(
t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression,
) *Reader {
Expand Down

0 comments on commit 08d7a93

Please sign in to comment.