From 930028902e5ff36e9a8dabeea4b4eda1f7b02909 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Thu, 16 Jul 2020 15:12:36 -0400 Subject: [PATCH 1/8] Add schema.VersionChecker VersionChecker allows us to make decisions around how to proceed depending on the Major and Minor Version in the index info file. --- src/dbnode/persist/fs/read.go | 2 ++ src/dbnode/persist/fs/seek.go | 8 +++++-- src/dbnode/persist/schema/version_checker.go | 22 ++++++++++++++++++ .../persist/schema/version_checker_test.go | 23 +++++++++++++++++++ 4 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 src/dbnode/persist/schema/version_checker.go create mode 100644 src/dbnode/persist/schema/version_checker_test.go diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 9805f06170..b866619f88 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -99,6 +99,7 @@ type reader struct { shard uint32 volume int open bool + versionChecker *schema.VersionChecker } // NewReader returns a new reader and expects all files to exist. Will read the @@ -325,6 +326,7 @@ func (r *reader) readInfo(size int) error { r.entriesRead = 0 r.metadataRead = 0 r.bloomFilterInfo = info.BloomFilter + r.versionChecker = schema.NewVersionChecker(int(info.MajorVersion), int(info.MinorVersion)) return nil } diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index fa6faa3e4f..8c5faf6e53 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -65,8 +65,9 @@ type seeker struct { // Data read from the indexInfo file. Note that we use xtime.UnixNano // instead of time.Time to avoid keeping an extra pointer around. - start xtime.UnixNano - blockSize time.Duration + start xtime.UnixNano + blockSize time.Duration + versionChecker *schema.VersionChecker dataFd *os.File indexFd *os.File @@ -224,6 +225,7 @@ func (s *seeker) Open( } s.start = xtime.UnixNano(info.BlockStart) s.blockSize = time.Duration(info.BlockSize) + s.versionChecker = schema.NewVersionChecker(int(info.MajorVersion), int(info.MinorVersion)) err = s.validateIndexFileDigest( indexFdWithDigest, expectedDigests.indexDigest) @@ -510,6 +512,8 @@ func (s *seeker) ConcurrentClone() (ConcurrentDataFileSetSeeker, error) { // they are concurrency safe and can be shared among clones. indexFd: s.indexFd, dataFd: s.dataFd, + + versionChecker: s.versionChecker, } return seeker, nil diff --git a/src/dbnode/persist/schema/version_checker.go b/src/dbnode/persist/schema/version_checker.go new file mode 100644 index 0000000000..43737df989 --- /dev/null +++ b/src/dbnode/persist/schema/version_checker.go @@ -0,0 +1,22 @@ +package schema + +// VersionChecker centralizes logic for checking if a major, minor version combo supports +// specific functionality +type VersionChecker struct { + majorVersion int + minorVersion int +} + +func NewVersionChecker(majorVersion int, minorVersion int) *VersionChecker { + return &VersionChecker{ + majorVersion: majorVersion, + minorVersion: minorVersion, + } +} + +// IndexEntryValidationEnabled checks the version to determine if +// fileset files of the specified version allow for doing checksum validation +// on individual index entries +func (v *VersionChecker) IndexEntryValidationEnabled() bool { + return v.majorVersion >= 1 && v.minorVersion >= 1 +} diff --git a/src/dbnode/persist/schema/version_checker_test.go b/src/dbnode/persist/schema/version_checker_test.go new file mode 100644 index 0000000000..50dd5addc6 --- /dev/null +++ b/src/dbnode/persist/schema/version_checker_test.go @@ -0,0 +1,23 @@ +package schema + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIndexEntryValidationEnabled(t *testing.T) { + checker := NewVersionChecker(1, 1) + require.True(t, checker.IndexEntryValidationEnabled()) + + checker = NewVersionChecker(1, 2) + require.True(t, checker.IndexEntryValidationEnabled()) + + checker = NewVersionChecker(2, 1) + require.True(t, checker.IndexEntryValidationEnabled()) +} + +func TestIndexEntryValidationDisabled(t *testing.T) { + checker := NewVersionChecker(1, 0) + require.False(t, checker.IndexEntryValidationEnabled()) +} From adf0c1d947e4cf71c4da87b62d73baaedec1c376 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Thu, 16 Jul 2020 16:12:32 -0400 Subject: [PATCH 2/8] Add DecoderStreamWithDigest --- .../persist/fs/msgpack/stream_with_digest.go | 114 ++++++++++++++ .../fs/msgpack/stream_with_digest_test.go | 143 ++++++++++++++++++ 2 files changed, 257 insertions(+) create mode 100644 src/dbnode/persist/fs/msgpack/stream_with_digest.go create mode 100644 src/dbnode/persist/fs/msgpack/stream_with_digest_test.go diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest.go b/src/dbnode/persist/fs/msgpack/stream_with_digest.go new file mode 100644 index 0000000000..a97b0a69ce --- /dev/null +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest.go @@ -0,0 +1,114 @@ +package msgpack + +import ( + "errors" + "hash" + "hash/adler32" +) + +var ( + // errChecksumMismatch returned when the calculated checksum doesn't match the stored checksum + errChecksumMismatch = errors.New("calculated checksum doesn't match stored checksum") +) + +// DecoderStreamWithDigest calculates the digest as it processes a decoder stream. +type DecoderStreamWithDigest interface { + DecoderStream + + // Reset resets the reader for use with a new reader. + Reset(stream DecoderStream) + + // Digest returns the digest + Digest() hash.Hash32 + + // Validate compares the current digest against the expected digest and returns + // an error if they don't match. + Validate(expectedDigest uint32) error + + // Capture provides a mechanism for manually capturing bytes to add to digest when reader is manipulated + // through atypical means (e.g. reading directly from the backing byte slice of a ByteReader) + Capture(bytes []byte) error + + // Returns the decoder stream wrapped by this object + wrappedStream() DecoderStream +} + +type decoderStreamWithDigest struct { + reader DecoderStream + digest hash.Hash32 + unreadByte bool +} + +func newDecoderStreamWithDigest(reader DecoderStream) DecoderStreamWithDigest { + return &decoderStreamWithDigest{ + reader: reader, + digest: adler32.New(), + } +} + +func (d *decoderStreamWithDigest) Read(p []byte) (n int, err error) { + n, err = d.reader.Read(p) + if n > 0 { + start := 0 + if d.unreadByte { + d.unreadByte = false + start++ + } + if _, err := d.digest.Write(p[start:n]); err != nil { + return 0, err + } + } + return n, err +} + +func (d *decoderStreamWithDigest) ReadByte() (byte, error) { + b, err := d.reader.ReadByte() + if err == nil { + if d.unreadByte { + d.unreadByte = false + } else { + if _, err := d.digest.Write([]byte{b}); err != nil { + return b, err + } + } + } + return b, err +} + +func (d *decoderStreamWithDigest) UnreadByte() error { + err := d.reader.UnreadByte() + if err == nil { + d.unreadByte = true + } + return err +} + +func (d *decoderStreamWithDigest) Reset(stream DecoderStream) { + d.reader = stream + d.digest.Reset() +} + +func (d *decoderStreamWithDigest) Digest() hash.Hash32 { + return d.digest +} + +func (d *decoderStreamWithDigest) Validate(expectedDigest uint32) error { + if d.digest.Sum32() != expectedDigest { + return errChecksumMismatch + } + return nil +} + +func (d *decoderStreamWithDigest) Capture(bytes []byte) error { + // No-op if not actually capturing at the moment + if d.reader != nil { + if _, err := d.digest.Write(bytes); err != nil { + return err + } + } + return nil +} + +func (d *decoderStreamWithDigest) wrappedStream() DecoderStream { + return d.reader +} diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go b/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go new file mode 100644 index 0000000000..e661f398a7 --- /dev/null +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go @@ -0,0 +1,143 @@ +package msgpack + +import ( + "bufio" + "bytes" + "hash/adler32" + "testing" + + "github.com/stretchr/testify/require" +) + +const srcString = "foo bar baz qux quux corge grault" + +func TestDecoderStreamWithDigestRead(t *testing.T) { + stream := newTestDecoderStream() + + // Read srcString in chunkLen size chunks + chunkLen := 3 + buf := make([]byte, len(srcString)) + for start := 0; start < len(srcString); start = start + chunkLen { + end := start + chunkLen + if end > len(srcString) { + end = len(srcString) + } + + n, err := stream.Read(buf[start:end]) + require.NoError(t, err) + require.Equal(t, chunkLen, n) + require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + } +} + +func TestDecoderStreamWithDigestReadByte(t *testing.T) { + stream := newTestDecoderStream() + + buf := make([]byte, len(srcString)) + for i := 1; i < len(srcString); i++ { + n, err := stream.Read(buf[i-1 : i]) + require.NoError(t, err) + require.Equal(t, 1, n) + require.Equal(t, adler32.Checksum(buf[:i]), stream.Digest().Sum32()) + } +} + +func TestDecoderStreamWithDigestUnreadByte(t *testing.T) { + stream := decoderStreamWithDigest{ + reader: bufio.NewReader(bytes.NewReader([]byte(srcString))), + digest: adler32.New(), + } + + b, err := stream.ReadByte() + require.NoError(t, err) + require.Equal(t, srcString[0], b) + require.False(t, stream.unreadByte) + + err = stream.UnreadByte() + require.NoError(t, err) + require.True(t, stream.unreadByte) +} + +func TestDecoderStreamWithDigestReset(t *testing.T) { + stream := newTestDecoderStream() + + b, err := stream.ReadByte() + require.NoError(t, err) + require.Equal(t, srcString[0], b) + + b, err = stream.ReadByte() + require.NoError(t, err) + require.Equal(t, srcString[1], b) + + stream.Reset(bufio.NewReader(bytes.NewReader([]byte(srcString)))) + + b, err = stream.ReadByte() + require.NoError(t, err) + require.Equal(t, srcString[0], b) +} + +func TestDecoderStreamWithDigestValidate(t *testing.T) { + stream := newTestDecoderStream() + buf := make([]byte, 5) + + n, err := stream.Read(buf) + require.NoError(t, err) + require.Equal(t, 5, n) + + require.NoError(t, stream.Validate(adler32.Checksum(buf))) + require.Error(t, stream.Validate(adler32.Checksum([]byte("asdf")))) +} + +func TestDecoderStreamWithDigestCapture(t *testing.T) { + stream := newTestDecoderStream() + + require.NoError(t, stream.Validate(1)) + + bytes := []byte("manual capture") + require.NoError(t, stream.Capture(bytes)) + + require.Equal(t, adler32.Checksum(bytes), stream.Digest().Sum32()) +} + +func TestDecoderStreamWithDigestReadUnreadRead(t *testing.T) { + stream := newTestDecoderStream() + + buf := make([]byte, len(srcString)) + end := 0 + + b1, err := stream.ReadByte() + require.NoError(t, err) + buf[0] = b1 + end++ + require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + + err = stream.UnreadByte() + end-- + require.NoError(t, err) + + b2, err := stream.ReadByte() + require.NoError(t, err) + end++ + require.Equal(t, b1, b2) + require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + + n, err := stream.Read(buf[end : end+4]) + require.NoError(t, err) + require.Equal(t, 4, n) + end += n + require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + + err = stream.UnreadByte() + end-- + require.NoError(t, err) + + n, err = stream.Read(buf[end : end+4]) + require.NoError(t, err) + require.Equal(t, 4, n) + end += n + require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) +} + +func newTestDecoderStream() DecoderStreamWithDigest { + return newDecoderStreamWithDigest(bufio.NewReader(bytes.NewReader([]byte(srcString)))) +} From 0b7aafc9020cf58e0eec43040d4cbd43816f728f Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Thu, 16 Jul 2020 16:15:02 -0400 Subject: [PATCH 3/8] Wire up index entry checksum validation on decode --- .../persist/fs/index_lookup_prop_test.go | 2 +- src/dbnode/persist/fs/msgpack/decoder.go | 48 ++++++++++++++++--- src/dbnode/persist/fs/msgpack/decoder_test.go | 30 ++++++++++-- .../persist/fs/msgpack/roundtrip_test.go | 14 +++--- src/dbnode/persist/fs/read.go | 2 +- src/dbnode/persist/fs/seek.go | 20 ++++---- src/dbnode/persist/schema/types.go | 2 +- 7 files changed, 88 insertions(+), 30 deletions(-) diff --git a/src/dbnode/persist/fs/index_lookup_prop_test.go b/src/dbnode/persist/fs/index_lookup_prop_test.go index 8daabd66bd..e6d23c7249 100644 --- a/src/dbnode/persist/fs/index_lookup_prop_test.go +++ b/src/dbnode/persist/fs/index_lookup_prop_test.go @@ -271,7 +271,7 @@ func readIndexFileOffsets(shardDirPath string, numEntries int, start time.Time) summariesOffsets := map[string]int64{} for read := 0; read < numEntries; read++ { offset := int64(len(buf)) - (decoderStream.Remaining()) - entry, err := decoder.DecodeIndexEntry(nil) + entry, err := decoder.DecodeIndexEntry(nil, true) if err != nil { return nil, fmt.Errorf("err decoding index entry: %v", err) } diff --git a/src/dbnode/persist/fs/msgpack/decoder.go b/src/dbnode/persist/fs/msgpack/decoder.go index 0d9ada5112..76646580e6 100644 --- a/src/dbnode/persist/fs/msgpack/decoder.go +++ b/src/dbnode/persist/fs/msgpack/decoder.go @@ -46,6 +46,7 @@ var ( errorUnableToDetermineNumFieldsToSkip = errors.New("unable to determine num fields to skip") errorCalledDecodeBytesWithoutByteStreamDecoder = errors.New("called decodeBytes with out byte stream decoder") + errorIndexEntryChecksumMismatch = errors.New("decode index entry encountered checksum mismatch") ) // Decoder decodes persisted msgpack-encoded data @@ -53,7 +54,10 @@ type Decoder struct { reader DecoderStream // Will only be set if the Decoder is Reset() with a DecoderStream // that also implements ByteStream. - byteReader ByteStream + byteReader ByteStream + // Created in constructor but only set with a reader when + // wrapWithStreamWithDigest() is invoked + streamWithDigest DecoderStreamWithDigest dec *msgpack.Decoder err error allocDecodedBytes bool @@ -76,6 +80,7 @@ func newDecoder(legacy legacyEncodingOptions, opts DecodingOptions) *Decoder { reader: reader, dec: msgpack.NewDecoder(reader), legacy: legacy, + streamWithDigest: newDecoderStreamWithDigest(nil), } } @@ -83,9 +88,14 @@ func newDecoder(legacy legacyEncodingOptions, opts DecodingOptions) *Decoder { func (dec *Decoder) Reset(stream DecoderStream) { dec.reader = stream + unwrappedStream := stream + if streamWithDigest, ok := stream.(DecoderStreamWithDigest); ok { + unwrappedStream = streamWithDigest.wrappedStream() + } + // Do the type assertion upfront so that we don't have to do it // repeatedly later. - if byteStream, ok := stream.(ByteStream); ok { + if byteStream, ok := unwrappedStream.(ByteStream); ok { dec.byteReader = byteStream } else { dec.byteReader = nil @@ -111,12 +121,13 @@ func (dec *Decoder) DecodeIndexInfo() (schema.IndexInfo, error) { } // DecodeIndexEntry decodes index entry -func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool) (schema.IndexEntry, error) { +func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool, validate bool) (schema.IndexEntry, error) { if dec.err != nil { return emptyIndexEntry, dec.err } + dec.wrapWithStreamWithDigest() _, numFieldsToSkip := dec.decodeRootObject(indexEntryVersion, indexEntryType) - indexEntry := dec.decodeIndexEntry(bytesPool) + indexEntry := dec.decodeIndexEntry(bytesPool, validate) dec.skip(numFieldsToSkip) if dec.err != nil { return emptyIndexEntry, dec.err @@ -348,7 +359,7 @@ func (dec *Decoder) decodeIndexBloomFilterInfo() schema.IndexBloomFilterInfo { return indexBloomFilterInfo } -func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry { +func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) schema.IndexEntry { var opts checkNumFieldsOptions switch dec.legacy.decodeLegacyIndexEntryVersion { case legacyEncodingIndexEntryVersionV1: @@ -389,6 +400,7 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry // At this point, if its a V1 file, we've decoded all the available fields. if dec.legacy.decodeLegacyIndexEntryVersion == legacyEncodingIndexEntryVersionV1 || actual < 6 { + _ = dec.checksumAndUnwrapStreamWithDigest() dec.skip(numFieldsToSkip) return indexEntry } @@ -402,6 +414,7 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry // At this point, if its a V2 file, we've decoded all the available fields. if dec.legacy.decodeLegacyIndexEntryVersion == legacyEncodingIndexEntryVersionV2 || actual < 7 { + _ = dec.checksumAndUnwrapStreamWithDigest() dec.skip(numFieldsToSkip) return indexEntry } @@ -410,9 +423,14 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry // final field on index entries dec.skip(numFieldsToSkip) + actualChecksum := dec.checksumAndUnwrapStreamWithDigest() + // Decode checksum field originally added in V3 - // TODO(nate): actually use the checksum value for index entry validation - #2629 - _ = dec.decodeVarint() + expectedChecksum := uint32(dec.decodeVarint()) + + if validate && expectedChecksum != actualChecksum { + dec.err = errorIndexEntryChecksumMismatch + } return indexEntry } @@ -663,6 +681,8 @@ func (dec *Decoder) decodeBytes() ([]byte, int, int) { return nil, -1, -1 } value = backingBytes[currPos:targetPos] + dec.streamWithDigest.Capture(value) + return value, currPos, bytesLen } @@ -717,3 +737,17 @@ func (dec *Decoder) decodeBytesLen() int { dec.err = err return value } + +func (dec *Decoder) wrapWithStreamWithDigest() { + dec.streamWithDigest.Reset(dec.reader) + dec.Reset(dec.streamWithDigest) +} + +func (dec *Decoder) checksumAndUnwrapStreamWithDigest() uint32 { + checksum := dec.streamWithDigest.Digest().Sum32() + + dec.Reset(dec.streamWithDigest.wrappedStream()) + dec.streamWithDigest.Reset(nil) + + return checksum +} diff --git a/src/dbnode/persist/fs/msgpack/decoder_test.go b/src/dbnode/persist/fs/msgpack/decoder_test.go index de52019f1b..367340e119 100644 --- a/src/dbnode/persist/fs/msgpack/decoder_test.go +++ b/src/dbnode/persist/fs/msgpack/decoder_test.go @@ -62,7 +62,7 @@ func TestDecodeNewerVersionThanExpected(t *testing.T) { // Verify decoding index entry results in an error require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) dec.Reset(NewByteDecoderStream(enc.Bytes())) - _, err = dec.DecodeIndexEntry(nil) + _, err = dec.DecodeIndexEntry(nil, true) require.Error(t, err) // Verify decoding log info results in an error @@ -147,7 +147,7 @@ func TestDecodeIndexEntryMoreFieldsThanExpected(t *testing.T) { // Verify we can successfully skip unnecessary fields dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) @@ -232,7 +232,7 @@ func TestDecodeBytesNoAlloc(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) data := enc.Bytes() dec.Reset(NewByteDecoderStream(data)) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, []byte("testIndexEntry"), res.ID) @@ -252,7 +252,7 @@ func TestDecodeBytesAllocNew(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) data := enc.Bytes() dec.Reset(NewByteDecoderStream(data)) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, []byte("testIndexEntry"), res.ID) @@ -262,3 +262,25 @@ func TestDecodeBytesAllocNew(t *testing.T) { } require.Equal(t, []byte("testIndexEntry"), res.ID) } + +func TestDecodeIndexEntryInvalidChecksum(t *testing.T) { + var ( + enc = NewEncoder() + dec = NewDecoder(nil) + ) + require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) + + // Update to invalid checksum + enc.buf.Truncate(len(enc.Bytes()) - 5) + require.NoError(t, enc.enc.EncodeInt64(1234)) + + // validate set to true + dec.Reset(NewByteDecoderStream(enc.Bytes())) + _, err := dec.DecodeIndexEntry(nil, true) + require.Error(t, err) + + // validate set to false + dec.Reset(NewByteDecoderStream(enc.Bytes())) + _, err = dec.DecodeIndexEntry(nil, false) + require.NoError(t, err) +} diff --git a/src/dbnode/persist/fs/msgpack/roundtrip_test.go b/src/dbnode/persist/fs/msgpack/roundtrip_test.go index 0561a95e34..d5d5fbe516 100644 --- a/src/dbnode/persist/fs/msgpack/roundtrip_test.go +++ b/src/dbnode/persist/fs/msgpack/roundtrip_test.go @@ -378,7 +378,7 @@ func TestIndexEntryRoundtrip(t *testing.T) { ) require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -393,7 +393,7 @@ func TestIndexEntryRoundtripWithBytesPool(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(pool) + res, err := dec.DecodeIndexEntry(pool, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -421,7 +421,7 @@ func TestIndexEntryRoundTripBackwardsCompatibilityV1(t *testing.T) { enc.EncodeIndexEntry(testIndexEntry) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -449,7 +449,7 @@ func TestIndexEntryRoundTripForwardsCompatibilityV1(t *testing.T) { }() dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -469,7 +469,7 @@ func TestIndexEntryRoundTripBackwardsCompatibilityV2(t *testing.T) { enc.EncodeIndexEntry(testIndexEntry) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -488,7 +488,7 @@ func TestIndexEntryRoundTripForwardsCompatibilityV2(t *testing.T) { enc.EncodeIndexEntry(testIndexEntry) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil) + res, err := dec.DecodeIndexEntry(nil, true) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -594,7 +594,7 @@ func TestMultiTypeRoundtripStress(t *testing.T) { case 0: res, err = dec.DecodeIndexInfo() case 1: - res, err = dec.DecodeIndexEntry(nil) + res, err = dec.DecodeIndexEntry(nil, true) case 2: res, err = dec.DecodeLogInfo() case 3: diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index b866619f88..b034cb7e7a 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -333,7 +333,7 @@ func (r *reader) readInfo(size int) error { func (r *reader) readIndexAndSortByOffsetAsc() error { r.decoder.Reset(r.indexDecoderStream) for i := 0; i < r.entries; i++ { - entry, err := r.decoder.DecodeIndexEntry(nil) + entry, err := r.decoder.DecodeIndexEntry(nil, r.versionChecker.IndexEntryValidationEnabled()) if err != nil { return err } diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index 8c5faf6e53..3458d2be5d 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -227,14 +227,16 @@ func (s *seeker) Open( s.blockSize = time.Duration(info.BlockSize) s.versionChecker = schema.NewVersionChecker(int(info.MajorVersion), int(info.MinorVersion)) - err = s.validateIndexFileDigest( - indexFdWithDigest, expectedDigests.indexDigest) - if err != nil { - s.Close() - return fmt.Errorf( - "index file digest for file: %s does not match the expected digest: %c", - filesetPathFromTimeLegacy(shardDir, blockStart, indexFileSuffix), err, - ) + if !s.versionChecker.IndexEntryValidationEnabled() { + err = s.validateIndexFileDigest( + indexFdWithDigest, expectedDigests.indexDigest) + if err != nil { + s.Close() + return fmt.Errorf( + "index file digest for file: %s does not match the expected digest: %c", + filesetPathFromTimeLegacy(shardDir, blockStart, indexFileSuffix), err, + ) + } } indexFdStat, err := s.indexFd.Stat() @@ -406,7 +408,7 @@ func (s *seeker) SeekIndexEntry( // very cheap pool until we find what we're looking for, and then we can perform a single // copy into checked.Bytes from the more expensive pool. entry, err := resources.xmsgpackDecoder.DecodeIndexEntry( - resources.decodeIndexEntryBytesPool) + resources.decodeIndexEntryBytesPool, s.versionChecker.IndexEntryValidationEnabled()) if err == io.EOF { // We reached the end of the file without finding it. return IndexEntry{}, errSeekIDNotFound diff --git a/src/dbnode/persist/schema/types.go b/src/dbnode/persist/schema/types.go index 71046044f8..1579fe0e78 100644 --- a/src/dbnode/persist/schema/types.go +++ b/src/dbnode/persist/schema/types.go @@ -32,7 +32,7 @@ const MajorVersion = 1 // MinorVersion is the minor schema version for a set of fileset files. // This is only incremented when *non-breaking* changes are introduced that // we want to have some level of control around how they're rolled out. -const MinorVersion = 0 +const MinorVersion = 1 // IndexInfo stores metadata information about block filesets type IndexInfo struct { From ba3aeae6a59d5e02f12d2cd13f9a87dbddce237e Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Mon, 20 Jul 2020 16:25:07 -0400 Subject: [PATCH 4/8] Add tests for seeker; address PR feedback --- src/dbnode/persist/fs/msgpack/decoder.go | 2 + src/dbnode/persist/fs/msgpack/decoder_test.go | 2 +- src/dbnode/persist/fs/seek.go | 24 ++++--- src/dbnode/persist/fs/seek_test.go | 67 ++++++++++++++++++- src/dbnode/persist/schema/version_checker.go | 2 +- .../persist/schema/version_checker_test.go | 3 + 6 files changed, 87 insertions(+), 13 deletions(-) diff --git a/src/dbnode/persist/fs/msgpack/decoder.go b/src/dbnode/persist/fs/msgpack/decoder.go index 76646580e6..03970a6b35 100644 --- a/src/dbnode/persist/fs/msgpack/decoder.go +++ b/src/dbnode/persist/fs/msgpack/decoder.go @@ -419,6 +419,8 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) sc return indexEntry } + // NB(nate): Any new fields should be parsed here. + // Intentionally skip any extra fields here as we've stipulated that from V3 onward, IndexEntryChecksum will be the // final field on index entries dec.skip(numFieldsToSkip) diff --git a/src/dbnode/persist/fs/msgpack/decoder_test.go b/src/dbnode/persist/fs/msgpack/decoder_test.go index 367340e119..31cf2cdd63 100644 --- a/src/dbnode/persist/fs/msgpack/decoder_test.go +++ b/src/dbnode/persist/fs/msgpack/decoder_test.go @@ -271,7 +271,7 @@ func TestDecodeIndexEntryInvalidChecksum(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) // Update to invalid checksum - enc.buf.Truncate(len(enc.Bytes()) - 5) + enc.buf.Truncate(len(enc.Bytes()) - 5) // 5 bytes = 1 byte for integer code + 4 bytes for checksum require.NoError(t, enc.enc.EncodeInt64(1234)) // validate set to true diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index 3458d2be5d..8cd337a0ae 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -227,16 +227,14 @@ func (s *seeker) Open( s.blockSize = time.Duration(info.BlockSize) s.versionChecker = schema.NewVersionChecker(int(info.MajorVersion), int(info.MinorVersion)) - if !s.versionChecker.IndexEntryValidationEnabled() { - err = s.validateIndexFileDigest( - indexFdWithDigest, expectedDigests.indexDigest) - if err != nil { - s.Close() - return fmt.Errorf( - "index file digest for file: %s does not match the expected digest: %c", - filesetPathFromTimeLegacy(shardDir, blockStart, indexFileSuffix), err, - ) - } + err = s.validateIndexFileDigest( + indexFdWithDigest, expectedDigests.indexDigest) + if err != nil { + s.Close() + return fmt.Errorf( + "index file digest for file: %s does not match the expected digest: %c", + filesetPathFromTimeLegacy(shardDir, blockStart, indexFileSuffix), err, + ) } indexFdStat, err := s.indexFd.Stat() @@ -525,6 +523,12 @@ func (s *seeker) validateIndexFileDigest( indexFdWithDigest digest.FdWithDigestReader, expectedDigest uint32, ) error { + // If piecemeal checksumming validation enabled for index entries, do not attempt to validate the + // checksum of the entire file + if s.versionChecker.IndexEntryValidationEnabled() { + return nil + } + buf := make([]byte, s.opts.dataBufferSize) for { n, err := indexFdWithDigest.Read(buf) diff --git a/src/dbnode/persist/fs/seek_test.go b/src/dbnode/persist/fs/seek_test.go index 6b9fa9db62..f74dadadae 100644 --- a/src/dbnode/persist/fs/seek_test.go +++ b/src/dbnode/persist/fs/seek_test.go @@ -30,8 +30,8 @@ import ( "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/x/ident" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -436,6 +436,71 @@ func TestCloneSeeker(t *testing.T) { assert.Equal(t, []byte{1, 2, 1}, data.Bytes()) } +func TestSeekValidateIndexEntriesFile(t *testing.T) { + dir, err := ioutil.TempDir("", "testdb") + if err != nil { + t.Fatal(err) + } + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + w := newTestWriter(t, filePathPrefix) + writerOpts := DataWriterOpenOptions{ + BlockSize: testBlockSize, + Identifier: FileSetFileIdentifier{ + Namespace: testNs1ID, + Shard: 0, + BlockStart: testWriterStart, + }, + } + err = w.Open(writerOpts) + assert.NoError(t, err) + + // Write data + assert.NoError(t, w.Write( + persist.NewMetadataFromIDAndTags( + ident.StringID("foo"), + ident.Tags{}, + persist.MetadataOptions{}), + bytesRefd([]byte{1, 2, 3}), + digest.Checksum([]byte{1, 2, 3}))) + assert.NoError(t, w.Close()) + + shardDir := ShardDataDirPath(filePathPrefix, testNs1ID, 0) + + // With full file validation disabled + s := seeker{opts: seekerOpts{ + filePathPrefix: filePathPrefix, + dataBufferSize: testReaderBufferSize, + infoBufferSize: testReaderBufferSize, + bytesPool: testBytesPool, + keepUnreadBuf: false, + opts: testDefaultOpts, + }} + s.versionChecker = schema.NewVersionChecker(1, 1) + + indexFilePath := dataFilesetPathFromTimeAndIndex(shardDir, testWriterStart, 0, indexFileSuffix, false) + indexFd, err := os.Open(indexFilePath) + assert.NoError(t, err) + indexReader := digest.NewFdWithDigestReader(defaultInfoReaderBufferSize) + indexReader.Reset(indexFd) + + assert.NoError(t, s.validateIndexFileDigest(indexReader, 0)) + + // With full file validation enabled + s.versionChecker = schema.NewVersionChecker(1, 0) + _, err = indexFd.Seek(0, 0) + assert.NoError(t, err) + indexReader.Reset(indexFd) + + assert.Error(t, s.validateIndexFileDigest(indexReader, 0)) + + // Sanity check -- call seeker#Open and ensure VersionChecker is set correctly + err = s.Open(testNs1ID, 0, testWriterStart, 0, newTestReusableSeekerResources()) + assert.NoError(t, err) + assert.True(t, s.versionChecker.IndexEntryValidationEnabled()) +} + func newTestReusableSeekerResources() ReusableSeekerResources { return NewReusableSeekerResources(testDefaultOpts) } diff --git a/src/dbnode/persist/schema/version_checker.go b/src/dbnode/persist/schema/version_checker.go index 43737df989..55db6b57ea 100644 --- a/src/dbnode/persist/schema/version_checker.go +++ b/src/dbnode/persist/schema/version_checker.go @@ -18,5 +18,5 @@ func NewVersionChecker(majorVersion int, minorVersion int) *VersionChecker { // fileset files of the specified version allow for doing checksum validation // on individual index entries func (v *VersionChecker) IndexEntryValidationEnabled() bool { - return v.majorVersion >= 1 && v.minorVersion >= 1 + return v.majorVersion >= 2 || v.majorVersion == 1 && v.minorVersion >= 1 } diff --git a/src/dbnode/persist/schema/version_checker_test.go b/src/dbnode/persist/schema/version_checker_test.go index 50dd5addc6..0d1a3feaea 100644 --- a/src/dbnode/persist/schema/version_checker_test.go +++ b/src/dbnode/persist/schema/version_checker_test.go @@ -15,6 +15,9 @@ func TestIndexEntryValidationEnabled(t *testing.T) { checker = NewVersionChecker(2, 1) require.True(t, checker.IndexEntryValidationEnabled()) + + checker = NewVersionChecker(2, 0) + require.True(t, checker.IndexEntryValidationEnabled()) } func TestIndexEntryValidationDisabled(t *testing.T) { From 491fc2aec294ec529425ea0c7ecda580a0fcc071 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 21 Jul 2020 19:02:17 -0400 Subject: [PATCH 5/8] Add license to new files --- .../persist/fs/msgpack/stream_with_digest.go | 20 ++++++++++++++++++ .../fs/msgpack/stream_with_digest_test.go | 20 ++++++++++++++++++ src/dbnode/persist/schema/version_checker.go | 21 +++++++++++++++++++ .../persist/schema/version_checker_test.go | 20 ++++++++++++++++++ 4 files changed, 81 insertions(+) diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest.go b/src/dbnode/persist/fs/msgpack/stream_with_digest.go index a97b0a69ce..6b8ff0ef6c 100644 --- a/src/dbnode/persist/fs/msgpack/stream_with_digest.go +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package msgpack import ( diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go b/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go index e661f398a7..5037820423 100644 --- a/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package msgpack import ( diff --git a/src/dbnode/persist/schema/version_checker.go b/src/dbnode/persist/schema/version_checker.go index 55db6b57ea..7ea71cc55f 100644 --- a/src/dbnode/persist/schema/version_checker.go +++ b/src/dbnode/persist/schema/version_checker.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package schema // VersionChecker centralizes logic for checking if a major, minor version combo supports @@ -7,6 +27,7 @@ type VersionChecker struct { minorVersion int } +// NewVersionChecker creates a new VersionChecker func NewVersionChecker(majorVersion int, minorVersion int) *VersionChecker { return &VersionChecker{ majorVersion: majorVersion, diff --git a/src/dbnode/persist/schema/version_checker_test.go b/src/dbnode/persist/schema/version_checker_test.go index 0d1a3feaea..217c38e1d5 100644 --- a/src/dbnode/persist/schema/version_checker_test.go +++ b/src/dbnode/persist/schema/version_checker_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package schema import ( From b3d03d48cb5b82df0c1a8ce3eb2889701581c54c Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 21 Jul 2020 19:39:28 -0400 Subject: [PATCH 6/8] Address PR feedback --- src/dbnode/persist/fs/msgpack/decoder.go | 48 +++----- .../persist/fs/msgpack/stream_with_digest.go | 109 ++++++++++-------- .../fs/msgpack/stream_with_digest_test.go | 61 +++++++--- 3 files changed, 121 insertions(+), 97 deletions(-) diff --git a/src/dbnode/persist/fs/msgpack/decoder.go b/src/dbnode/persist/fs/msgpack/decoder.go index 03970a6b35..d61d683584 100644 --- a/src/dbnode/persist/fs/msgpack/decoder.go +++ b/src/dbnode/persist/fs/msgpack/decoder.go @@ -55,9 +55,9 @@ type Decoder struct { // Will only be set if the Decoder is Reset() with a DecoderStream // that also implements ByteStream. byteReader ByteStream - // Created in constructor but only set with a reader when - // wrapWithStreamWithDigest() is invoked - streamWithDigest DecoderStreamWithDigest + // Wraps original reader with reader that can calculate digest. Digest calculation must be enabled, + // otherwise it defaults to off + readerWithDigest *decoderStreamWithDigest dec *msgpack.Decoder err error allocDecodedBytes bool @@ -80,7 +80,7 @@ func newDecoder(legacy legacyEncodingOptions, opts DecodingOptions) *Decoder { reader: reader, dec: msgpack.NewDecoder(reader), legacy: legacy, - streamWithDigest: newDecoderStreamWithDigest(nil), + readerWithDigest: newDecoderStreamWithDigest(nil), } } @@ -88,20 +88,16 @@ func newDecoder(legacy legacyEncodingOptions, opts DecodingOptions) *Decoder { func (dec *Decoder) Reset(stream DecoderStream) { dec.reader = stream - unwrappedStream := stream - if streamWithDigest, ok := stream.(DecoderStreamWithDigest); ok { - unwrappedStream = streamWithDigest.wrappedStream() - } - // Do the type assertion upfront so that we don't have to do it // repeatedly later. - if byteStream, ok := unwrappedStream.(ByteStream); ok { + if byteStream, ok := stream.(ByteStream); ok { dec.byteReader = byteStream } else { dec.byteReader = nil } - dec.dec.Reset(dec.reader) + dec.readerWithDigest.reset(dec.reader) + dec.dec.Reset(dec.readerWithDigest) dec.err = nil } @@ -125,9 +121,10 @@ func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool, validate bool) (s if dec.err != nil { return emptyIndexEntry, dec.err } - dec.wrapWithStreamWithDigest() + dec.readerWithDigest.setDigestReaderEnabled(true) _, numFieldsToSkip := dec.decodeRootObject(indexEntryVersion, indexEntryType) indexEntry := dec.decodeIndexEntry(bytesPool, validate) + dec.readerWithDigest.setDigestReaderEnabled(false) dec.skip(numFieldsToSkip) if dec.err != nil { return emptyIndexEntry, dec.err @@ -400,7 +397,6 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) sc // At this point, if its a V1 file, we've decoded all the available fields. if dec.legacy.decodeLegacyIndexEntryVersion == legacyEncodingIndexEntryVersionV1 || actual < 6 { - _ = dec.checksumAndUnwrapStreamWithDigest() dec.skip(numFieldsToSkip) return indexEntry } @@ -414,7 +410,6 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) sc // At this point, if its a V2 file, we've decoded all the available fields. if dec.legacy.decodeLegacyIndexEntryVersion == legacyEncodingIndexEntryVersionV2 || actual < 7 { - _ = dec.checksumAndUnwrapStreamWithDigest() dec.skip(numFieldsToSkip) return indexEntry } @@ -425,7 +420,9 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) sc // final field on index entries dec.skip(numFieldsToSkip) - actualChecksum := dec.checksumAndUnwrapStreamWithDigest() + // Retrieve actual checksum value here. Attempting to retrieve after decoding the upcoming expected checksum field + // would include value in actual checksum calculation which would cause a mismatch + actualChecksum := dec.readerWithDigest.digest().Sum32() // Decode checksum field originally added in V3 expectedChecksum := uint32(dec.decodeVarint()) @@ -683,7 +680,10 @@ func (dec *Decoder) decodeBytes() ([]byte, int, int) { return nil, -1, -1 } value = backingBytes[currPos:targetPos] - dec.streamWithDigest.Capture(value) + if err := dec.readerWithDigest.capture(value); err != nil { + dec.err = err + return nil, -1, -1 + } return value, currPos, bytesLen } @@ -702,7 +702,7 @@ func (dec *Decoder) decodeBytesWithPool(bytesPool pool.BytesPool) []byte { } bytes := bytesPool.Get(bytesLen)[:bytesLen] - n, err := io.ReadFull(dec.reader, bytes) + n, err := io.ReadFull(dec.readerWithDigest, bytes) if err != nil { dec.err = err bytesPool.Put(bytes) @@ -739,17 +739,3 @@ func (dec *Decoder) decodeBytesLen() int { dec.err = err return value } - -func (dec *Decoder) wrapWithStreamWithDigest() { - dec.streamWithDigest.Reset(dec.reader) - dec.Reset(dec.streamWithDigest) -} - -func (dec *Decoder) checksumAndUnwrapStreamWithDigest() uint32 { - checksum := dec.streamWithDigest.Digest().Sum32() - - dec.Reset(dec.streamWithDigest.wrappedStream()) - dec.streamWithDigest.Reset(nil) - - return checksum -} diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest.go b/src/dbnode/persist/fs/msgpack/stream_with_digest.go index 6b8ff0ef6c..e6d8f96f51 100644 --- a/src/dbnode/persist/fs/msgpack/stream_with_digest.go +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest.go @@ -31,50 +31,40 @@ var ( errChecksumMismatch = errors.New("calculated checksum doesn't match stored checksum") ) -// DecoderStreamWithDigest calculates the digest as it processes a decoder stream. -type DecoderStreamWithDigest interface { - DecoderStream - - // Reset resets the reader for use with a new reader. - Reset(stream DecoderStream) - - // Digest returns the digest - Digest() hash.Hash32 - - // Validate compares the current digest against the expected digest and returns - // an error if they don't match. - Validate(expectedDigest uint32) error - - // Capture provides a mechanism for manually capturing bytes to add to digest when reader is manipulated - // through atypical means (e.g. reading directly from the backing byte slice of a ByteReader) - Capture(bytes []byte) error - - // Returns the decoder stream wrapped by this object - wrappedStream() DecoderStream -} +var _ DecoderStream = &decoderStreamWithDigest{} +// decoderStreamWithDigest calculates the digest as it processes a decoder stream. type decoderStreamWithDigest struct { - reader DecoderStream - digest hash.Hash32 - unreadByte bool + reader DecoderStream + readerDigest hash.Hash32 + unreadByte bool + enabled bool } -func newDecoderStreamWithDigest(reader DecoderStream) DecoderStreamWithDigest { +// newDecoderStreamWithDigest returns a new decoderStreamWithDigest +func newDecoderStreamWithDigest(reader DecoderStream) *decoderStreamWithDigest { return &decoderStreamWithDigest{ - reader: reader, - digest: adler32.New(), + reader: reader, + readerDigest: adler32.New(), } } func (d *decoderStreamWithDigest) Read(p []byte) (n int, err error) { n, err = d.reader.Read(p) - if n > 0 { - start := 0 - if d.unreadByte { - d.unreadByte = false - start++ - } - if _, err := d.digest.Write(p[start:n]); err != nil { + if err != nil { + return n, err + } + if n <= 0 { + return n, nil + } + + start := 0 + if d.unreadByte { + d.unreadByte = false + start++ + } + if d.enabled { + if _, err := d.readerDigest.Write(p[start:n]); err != nil { return 0, err } } @@ -83,13 +73,15 @@ func (d *decoderStreamWithDigest) Read(p []byte) (n int, err error) { func (d *decoderStreamWithDigest) ReadByte() (byte, error) { b, err := d.reader.ReadByte() - if err == nil { - if d.unreadByte { - d.unreadByte = false - } else { - if _, err := d.digest.Write([]byte{b}); err != nil { - return b, err - } + if err != nil { + return 0, err + } + + if d.unreadByte { + d.unreadByte = false + } else if d.enabled { + if _, err := d.readerDigest.Write([]byte{b}); err != nil { + return b, err } } return b, err @@ -103,32 +95,49 @@ func (d *decoderStreamWithDigest) UnreadByte() error { return err } -func (d *decoderStreamWithDigest) Reset(stream DecoderStream) { +// reset resets the reader for use with a new reader. +func (d *decoderStreamWithDigest) reset(stream DecoderStream) { d.reader = stream - d.digest.Reset() + d.readerDigest.Reset() } -func (d *decoderStreamWithDigest) Digest() hash.Hash32 { - return d.digest +// digest returns the digest +func (d *decoderStreamWithDigest) digest() hash.Hash32 { + return d.readerDigest } -func (d *decoderStreamWithDigest) Validate(expectedDigest uint32) error { - if d.digest.Sum32() != expectedDigest { +// validate compares the current digest against the expected digest and returns +// an error if they don't match. +func (d *decoderStreamWithDigest) validate(expectedDigest uint32) error { + if d.readerDigest.Sum32() != expectedDigest { return errChecksumMismatch } return nil } -func (d *decoderStreamWithDigest) Capture(bytes []byte) error { +// capture provides a mechanism for manually capturing bytes to add to digest when reader is manipulated +// through atypical means (e.g. reading directly from the backing byte slice of a ByteReader) +func (d *decoderStreamWithDigest) capture(bytes []byte) error { // No-op if not actually capturing at the moment - if d.reader != nil { - if _, err := d.digest.Write(bytes); err != nil { + if d.enabled { + if _, err := d.readerDigest.Write(bytes); err != nil { return err } } return nil } +// setDigestReaderEnabled enables calculating of digest for bytes read. If this is false, behaves like a regular reader. +func (d *decoderStreamWithDigest) setDigestReaderEnabled(enabled bool) { + if !d.enabled && enabled { + d.enabled = true + d.readerDigest.Reset() + } else if d.enabled && !enabled { + d.enabled = false + } +} + +// Returns the decoder stream wrapped by this object func (d *decoderStreamWithDigest) wrappedStream() DecoderStream { return d.reader } diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go b/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go index 5037820423..87041a73fb 100644 --- a/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest_test.go @@ -46,7 +46,7 @@ func TestDecoderStreamWithDigestRead(t *testing.T) { n, err := stream.Read(buf[start:end]) require.NoError(t, err) require.Equal(t, chunkLen, n) - require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(buf[:end]), stream.digest().Sum32()) } } @@ -58,14 +58,14 @@ func TestDecoderStreamWithDigestReadByte(t *testing.T) { n, err := stream.Read(buf[i-1 : i]) require.NoError(t, err) require.Equal(t, 1, n) - require.Equal(t, adler32.Checksum(buf[:i]), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(buf[:i]), stream.digest().Sum32()) } } func TestDecoderStreamWithDigestUnreadByte(t *testing.T) { stream := decoderStreamWithDigest{ - reader: bufio.NewReader(bytes.NewReader([]byte(srcString))), - digest: adler32.New(), + reader: bufio.NewReader(bytes.NewReader([]byte(srcString))), + readerDigest: adler32.New(), } b, err := stream.ReadByte() @@ -89,7 +89,7 @@ func TestDecoderStreamWithDigestReset(t *testing.T) { require.NoError(t, err) require.Equal(t, srcString[1], b) - stream.Reset(bufio.NewReader(bytes.NewReader([]byte(srcString)))) + stream.reset(bufio.NewReader(bytes.NewReader([]byte(srcString)))) b, err = stream.ReadByte() require.NoError(t, err) @@ -104,19 +104,19 @@ func TestDecoderStreamWithDigestValidate(t *testing.T) { require.NoError(t, err) require.Equal(t, 5, n) - require.NoError(t, stream.Validate(adler32.Checksum(buf))) - require.Error(t, stream.Validate(adler32.Checksum([]byte("asdf")))) + require.NoError(t, stream.validate(adler32.Checksum(buf))) + require.Error(t, stream.validate(adler32.Checksum([]byte("asdf")))) } func TestDecoderStreamWithDigestCapture(t *testing.T) { stream := newTestDecoderStream() - require.NoError(t, stream.Validate(1)) + require.NoError(t, stream.validate(1)) bytes := []byte("manual capture") - require.NoError(t, stream.Capture(bytes)) + require.NoError(t, stream.capture(bytes)) - require.Equal(t, adler32.Checksum(bytes), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(bytes), stream.digest().Sum32()) } func TestDecoderStreamWithDigestReadUnreadRead(t *testing.T) { @@ -129,7 +129,7 @@ func TestDecoderStreamWithDigestReadUnreadRead(t *testing.T) { require.NoError(t, err) buf[0] = b1 end++ - require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(buf[:end]), stream.digest().Sum32()) err = stream.UnreadByte() end-- @@ -139,13 +139,13 @@ func TestDecoderStreamWithDigestReadUnreadRead(t *testing.T) { require.NoError(t, err) end++ require.Equal(t, b1, b2) - require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(buf[:end]), stream.digest().Sum32()) n, err := stream.Read(buf[end : end+4]) require.NoError(t, err) require.Equal(t, 4, n) end += n - require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(buf[:end]), stream.digest().Sum32()) err = stream.UnreadByte() end-- @@ -155,9 +155,38 @@ func TestDecoderStreamWithDigestReadUnreadRead(t *testing.T) { require.NoError(t, err) require.Equal(t, 4, n) end += n - require.Equal(t, adler32.Checksum(buf[:end]), stream.Digest().Sum32()) + require.Equal(t, adler32.Checksum(buf[:end]), stream.digest().Sum32()) } -func newTestDecoderStream() DecoderStreamWithDigest { - return newDecoderStreamWithDigest(bufio.NewReader(bytes.NewReader([]byte(srcString)))) +func TestDecoderStreamWithDigestSetEnabled(t *testing.T) { + stream := newTestDecoderStream() + + // Disable digest calculation + stream.setDigestReaderEnabled(false) + + buf := make([]byte, 5) + _, err := stream.Read(buf) + require.NoError(t, err) + require.Equal(t, stream.digest().Sum32(), uint32(1)) + + _, err = stream.ReadByte() + require.NoError(t, err) + require.Equal(t, stream.digest().Sum32(), uint32(1)) + + // Enable digest calculation + stream.setDigestReaderEnabled(true) + + _, err = stream.Read(buf) + require.NoError(t, err) + require.Equal(t, stream.digest().Sum32(), adler32.Checksum([]byte(srcString[6:11]))) + + _, err = stream.ReadByte() + require.NoError(t, err) + require.Equal(t, stream.digest().Sum32(), adler32.Checksum([]byte(srcString[6:12]))) +} + +func newTestDecoderStream() *decoderStreamWithDigest { + d := newDecoderStreamWithDigest(bufio.NewReader(bytes.NewReader([]byte(srcString)))) + d.setDigestReaderEnabled(true) + return d } From 03a610f86cb77fe36685fd3f2665b13d33e81ce6 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Wed, 22 Jul 2020 11:10:55 -0400 Subject: [PATCH 7/8] Always validate index entry checksum if available; other PR feedback --- src/dbnode/persist/fs/index_lookup_prop_test.go | 2 +- src/dbnode/persist/fs/msgpack/decoder.go | 10 +++++----- src/dbnode/persist/fs/msgpack/decoder_test.go | 16 +++++----------- src/dbnode/persist/fs/msgpack/roundtrip_test.go | 14 +++++++------- .../persist/fs/msgpack/stream_with_digest.go | 17 ++++++++++------- src/dbnode/persist/fs/read.go | 4 +--- src/dbnode/persist/fs/seek.go | 3 +-- 7 files changed, 30 insertions(+), 36 deletions(-) diff --git a/src/dbnode/persist/fs/index_lookup_prop_test.go b/src/dbnode/persist/fs/index_lookup_prop_test.go index e6d23c7249..8daabd66bd 100644 --- a/src/dbnode/persist/fs/index_lookup_prop_test.go +++ b/src/dbnode/persist/fs/index_lookup_prop_test.go @@ -271,7 +271,7 @@ func readIndexFileOffsets(shardDirPath string, numEntries int, start time.Time) summariesOffsets := map[string]int64{} for read := 0; read < numEntries; read++ { offset := int64(len(buf)) - (decoderStream.Remaining()) - entry, err := decoder.DecodeIndexEntry(nil, true) + entry, err := decoder.DecodeIndexEntry(nil) if err != nil { return nil, fmt.Errorf("err decoding index entry: %v", err) } diff --git a/src/dbnode/persist/fs/msgpack/decoder.go b/src/dbnode/persist/fs/msgpack/decoder.go index d61d683584..3cad323a20 100644 --- a/src/dbnode/persist/fs/msgpack/decoder.go +++ b/src/dbnode/persist/fs/msgpack/decoder.go @@ -56,7 +56,7 @@ type Decoder struct { // that also implements ByteStream. byteReader ByteStream // Wraps original reader with reader that can calculate digest. Digest calculation must be enabled, - // otherwise it defaults to off + // otherwise it defaults to off. readerWithDigest *decoderStreamWithDigest dec *msgpack.Decoder err error @@ -117,13 +117,13 @@ func (dec *Decoder) DecodeIndexInfo() (schema.IndexInfo, error) { } // DecodeIndexEntry decodes index entry -func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool, validate bool) (schema.IndexEntry, error) { +func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool) (schema.IndexEntry, error) { if dec.err != nil { return emptyIndexEntry, dec.err } dec.readerWithDigest.setDigestReaderEnabled(true) _, numFieldsToSkip := dec.decodeRootObject(indexEntryVersion, indexEntryType) - indexEntry := dec.decodeIndexEntry(bytesPool, validate) + indexEntry := dec.decodeIndexEntry(bytesPool) dec.readerWithDigest.setDigestReaderEnabled(false) dec.skip(numFieldsToSkip) if dec.err != nil { @@ -356,7 +356,7 @@ func (dec *Decoder) decodeIndexBloomFilterInfo() schema.IndexBloomFilterInfo { return indexBloomFilterInfo } -func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) schema.IndexEntry { +func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry { var opts checkNumFieldsOptions switch dec.legacy.decodeLegacyIndexEntryVersion { case legacyEncodingIndexEntryVersionV1: @@ -427,7 +427,7 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool, validate bool) sc // Decode checksum field originally added in V3 expectedChecksum := uint32(dec.decodeVarint()) - if validate && expectedChecksum != actualChecksum { + if expectedChecksum != actualChecksum { dec.err = errorIndexEntryChecksumMismatch } diff --git a/src/dbnode/persist/fs/msgpack/decoder_test.go b/src/dbnode/persist/fs/msgpack/decoder_test.go index 31cf2cdd63..d36f0b120b 100644 --- a/src/dbnode/persist/fs/msgpack/decoder_test.go +++ b/src/dbnode/persist/fs/msgpack/decoder_test.go @@ -62,7 +62,7 @@ func TestDecodeNewerVersionThanExpected(t *testing.T) { // Verify decoding index entry results in an error require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) dec.Reset(NewByteDecoderStream(enc.Bytes())) - _, err = dec.DecodeIndexEntry(nil, true) + _, err = dec.DecodeIndexEntry(nil) require.Error(t, err) // Verify decoding log info results in an error @@ -147,7 +147,7 @@ func TestDecodeIndexEntryMoreFieldsThanExpected(t *testing.T) { // Verify we can successfully skip unnecessary fields dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, testIndexEntry, res) @@ -232,7 +232,7 @@ func TestDecodeBytesNoAlloc(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) data := enc.Bytes() dec.Reset(NewByteDecoderStream(data)) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, []byte("testIndexEntry"), res.ID) @@ -252,7 +252,7 @@ func TestDecodeBytesAllocNew(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) data := enc.Bytes() dec.Reset(NewByteDecoderStream(data)) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, []byte("testIndexEntry"), res.ID) @@ -274,13 +274,7 @@ func TestDecodeIndexEntryInvalidChecksum(t *testing.T) { enc.buf.Truncate(len(enc.Bytes()) - 5) // 5 bytes = 1 byte for integer code + 4 bytes for checksum require.NoError(t, enc.enc.EncodeInt64(1234)) - // validate set to true dec.Reset(NewByteDecoderStream(enc.Bytes())) - _, err := dec.DecodeIndexEntry(nil, true) + _, err := dec.DecodeIndexEntry(nil) require.Error(t, err) - - // validate set to false - dec.Reset(NewByteDecoderStream(enc.Bytes())) - _, err = dec.DecodeIndexEntry(nil, false) - require.NoError(t, err) } diff --git a/src/dbnode/persist/fs/msgpack/roundtrip_test.go b/src/dbnode/persist/fs/msgpack/roundtrip_test.go index d5d5fbe516..0561a95e34 100644 --- a/src/dbnode/persist/fs/msgpack/roundtrip_test.go +++ b/src/dbnode/persist/fs/msgpack/roundtrip_test.go @@ -378,7 +378,7 @@ func TestIndexEntryRoundtrip(t *testing.T) { ) require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -393,7 +393,7 @@ func TestIndexEntryRoundtripWithBytesPool(t *testing.T) { require.NoError(t, enc.EncodeIndexEntry(testIndexEntry)) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(pool, true) + res, err := dec.DecodeIndexEntry(pool) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -421,7 +421,7 @@ func TestIndexEntryRoundTripBackwardsCompatibilityV1(t *testing.T) { enc.EncodeIndexEntry(testIndexEntry) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -449,7 +449,7 @@ func TestIndexEntryRoundTripForwardsCompatibilityV1(t *testing.T) { }() dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -469,7 +469,7 @@ func TestIndexEntryRoundTripBackwardsCompatibilityV2(t *testing.T) { enc.EncodeIndexEntry(testIndexEntry) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -488,7 +488,7 @@ func TestIndexEntryRoundTripForwardsCompatibilityV2(t *testing.T) { enc.EncodeIndexEntry(testIndexEntry) dec.Reset(NewByteDecoderStream(enc.Bytes())) - res, err := dec.DecodeIndexEntry(nil, true) + res, err := dec.DecodeIndexEntry(nil) require.NoError(t, err) require.Equal(t, testIndexEntry, res) } @@ -594,7 +594,7 @@ func TestMultiTypeRoundtripStress(t *testing.T) { case 0: res, err = dec.DecodeIndexInfo() case 1: - res, err = dec.DecodeIndexEntry(nil, true) + res, err = dec.DecodeIndexEntry(nil) case 2: res, err = dec.DecodeLogInfo() case 3: diff --git a/src/dbnode/persist/fs/msgpack/stream_with_digest.go b/src/dbnode/persist/fs/msgpack/stream_with_digest.go index e6d8f96f51..ff03e2c45d 100644 --- a/src/dbnode/persist/fs/msgpack/stream_with_digest.go +++ b/src/dbnode/persist/fs/msgpack/stream_with_digest.go @@ -35,17 +35,19 @@ var _ DecoderStream = &decoderStreamWithDigest{} // decoderStreamWithDigest calculates the digest as it processes a decoder stream. type decoderStreamWithDigest struct { - reader DecoderStream - readerDigest hash.Hash32 - unreadByte bool - enabled bool + reader DecoderStream + readerDigest hash.Hash32 + unreadByte bool + enabled bool + singleByteBuf []byte } // newDecoderStreamWithDigest returns a new decoderStreamWithDigest func newDecoderStreamWithDigest(reader DecoderStream) *decoderStreamWithDigest { return &decoderStreamWithDigest{ - reader: reader, - readerDigest: adler32.New(), + reader: reader, + readerDigest: adler32.New(), + singleByteBuf: make([]byte, 1), } } @@ -80,7 +82,8 @@ func (d *decoderStreamWithDigest) ReadByte() (byte, error) { if d.unreadByte { d.unreadByte = false } else if d.enabled { - if _, err := d.readerDigest.Write([]byte{b}); err != nil { + d.singleByteBuf[0] = b + if _, err := d.readerDigest.Write(d.singleByteBuf); err != nil { return b, err } } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index b034cb7e7a..9805f06170 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -99,7 +99,6 @@ type reader struct { shard uint32 volume int open bool - versionChecker *schema.VersionChecker } // NewReader returns a new reader and expects all files to exist. Will read the @@ -326,14 +325,13 @@ func (r *reader) readInfo(size int) error { r.entriesRead = 0 r.metadataRead = 0 r.bloomFilterInfo = info.BloomFilter - r.versionChecker = schema.NewVersionChecker(int(info.MajorVersion), int(info.MinorVersion)) return nil } func (r *reader) readIndexAndSortByOffsetAsc() error { r.decoder.Reset(r.indexDecoderStream) for i := 0; i < r.entries; i++ { - entry, err := r.decoder.DecodeIndexEntry(nil, r.versionChecker.IndexEntryValidationEnabled()) + entry, err := r.decoder.DecodeIndexEntry(nil) if err != nil { return err } diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index 8cd337a0ae..8bd04e8bfd 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -405,8 +405,7 @@ func (s *seeker) SeekIndexEntry( // this is a tight loop (scanning linearly through the index file) we want to use a // very cheap pool until we find what we're looking for, and then we can perform a single // copy into checked.Bytes from the more expensive pool. - entry, err := resources.xmsgpackDecoder.DecodeIndexEntry( - resources.decodeIndexEntryBytesPool, s.versionChecker.IndexEntryValidationEnabled()) + entry, err := resources.xmsgpackDecoder.DecodeIndexEntry(resources.decodeIndexEntryBytesPool) if err == io.EOF { // We reached the end of the file without finding it. return IndexEntry{}, errSeekIDNotFound From 41493100dc0286cc49cda824af0340169e3564cb Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Wed, 22 Jul 2020 11:30:11 -0400 Subject: [PATCH 8/8] Make VersionChecker not a pointer --- src/dbnode/persist/fs/seek.go | 2 +- src/dbnode/persist/schema/version_checker.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index 8bd04e8bfd..24e2e5f62c 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -67,7 +67,7 @@ type seeker struct { // instead of time.Time to avoid keeping an extra pointer around. start xtime.UnixNano blockSize time.Duration - versionChecker *schema.VersionChecker + versionChecker schema.VersionChecker dataFd *os.File indexFd *os.File diff --git a/src/dbnode/persist/schema/version_checker.go b/src/dbnode/persist/schema/version_checker.go index 7ea71cc55f..ac14edec07 100644 --- a/src/dbnode/persist/schema/version_checker.go +++ b/src/dbnode/persist/schema/version_checker.go @@ -28,8 +28,8 @@ type VersionChecker struct { } // NewVersionChecker creates a new VersionChecker -func NewVersionChecker(majorVersion int, minorVersion int) *VersionChecker { - return &VersionChecker{ +func NewVersionChecker(majorVersion int, minorVersion int) VersionChecker { + return VersionChecker{ majorVersion: majorVersion, minorVersion: minorVersion, }