Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Validate individual index entries on decode instead of entire file on open #2468

Merged
merged 8 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions src/dbnode/persist/fs/msgpack/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ var (

errorUnableToDetermineNumFieldsToSkip = errors.New("unable to determine num fields to skip")
errorCalledDecodeBytesWithoutByteStreamDecoder = errors.New("called decodeBytes with out byte stream decoder")
errorIndexEntryChecksumMismatch = errors.New("decode index entry encountered checksum mismatch")
)

// Decoder decodes persisted msgpack-encoded data
type Decoder struct {
reader DecoderStream
// Will only be set if the Decoder is Reset() with a DecoderStream
// that also implements ByteStream.
byteReader ByteStream
byteReader ByteStream
// Wraps original reader with reader that can calculate digest. Digest calculation must be enabled,
// otherwise it defaults to off.
readerWithDigest *decoderStreamWithDigest
dec *msgpack.Decoder
err error
allocDecodedBytes bool
Expand All @@ -76,6 +80,7 @@ func newDecoder(legacy legacyEncodingOptions, opts DecodingOptions) *Decoder {
reader: reader,
dec: msgpack.NewDecoder(reader),
legacy: legacy,
readerWithDigest: newDecoderStreamWithDigest(nil),
}
}

Expand All @@ -91,7 +96,8 @@ func (dec *Decoder) Reset(stream DecoderStream) {
dec.byteReader = nil
}

dec.dec.Reset(dec.reader)
dec.readerWithDigest.reset(dec.reader)
dec.dec.Reset(dec.readerWithDigest)
dec.err = nil
}

Expand All @@ -115,8 +121,10 @@ func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool) (schema.IndexEntr
if dec.err != nil {
return emptyIndexEntry, dec.err
}
dec.readerWithDigest.setDigestReaderEnabled(true)
_, numFieldsToSkip := dec.decodeRootObject(indexEntryVersion, indexEntryType)
indexEntry := dec.decodeIndexEntry(bytesPool)
dec.readerWithDigest.setDigestReaderEnabled(false)
dec.skip(numFieldsToSkip)
if dec.err != nil {
return emptyIndexEntry, dec.err
Expand Down Expand Up @@ -406,13 +414,22 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry
return indexEntry
}

// NB(nate): Any new fields should be parsed here.

// Intentionally skip any extra fields here as we've stipulated that from V3 onward, IndexEntryChecksum will be the
// final field on index entries
dec.skip(numFieldsToSkip)

// Retrieve actual checksum value here. Attempting to retrieve after decoding the upcoming expected checksum field
// would include value in actual checksum calculation which would cause a mismatch
actualChecksum := dec.readerWithDigest.digest().Sum32()

// Decode checksum field originally added in V3
// TODO(nate): actually use the checksum value for index entry validation - #2629
_ = dec.decodeVarint()
expectedChecksum := uint32(dec.decodeVarint())

if expectedChecksum != actualChecksum {
dec.err = errorIndexEntryChecksumMismatch
}

return indexEntry
}
Expand Down Expand Up @@ -663,6 +680,11 @@ func (dec *Decoder) decodeBytes() ([]byte, int, int) {
return nil, -1, -1
}
value = backingBytes[currPos:targetPos]
if err := dec.readerWithDigest.capture(value); err != nil {
dec.err = err
return nil, -1, -1
}

return value, currPos, bytesLen
}

Expand All @@ -680,7 +702,7 @@ func (dec *Decoder) decodeBytesWithPool(bytesPool pool.BytesPool) []byte {
}

bytes := bytesPool.Get(bytesLen)[:bytesLen]
n, err := io.ReadFull(dec.reader, bytes)
n, err := io.ReadFull(dec.readerWithDigest, bytes)
if err != nil {
dec.err = err
bytesPool.Put(bytes)
Expand Down
16 changes: 16 additions & 0 deletions src/dbnode/persist/fs/msgpack/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,19 @@ func TestDecodeBytesAllocNew(t *testing.T) {
}
require.Equal(t, []byte("testIndexEntry"), res.ID)
}

func TestDecodeIndexEntryInvalidChecksum(t *testing.T) {
var (
enc = NewEncoder()
dec = NewDecoder(nil)
)
require.NoError(t, enc.EncodeIndexEntry(testIndexEntry))

// Update to invalid checksum
enc.buf.Truncate(len(enc.Bytes()) - 5) // 5 bytes = 1 byte for integer code + 4 bytes for checksum
require.NoError(t, enc.enc.EncodeInt64(1234))

dec.Reset(NewByteDecoderStream(enc.Bytes()))
_, err := dec.DecodeIndexEntry(nil)
require.Error(t, err)
}
146 changes: 146 additions & 0 deletions src/dbnode/persist/fs/msgpack/stream_with_digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package msgpack
robskillington marked this conversation as resolved.
Show resolved Hide resolved

import (
"errors"
"hash"
"hash/adler32"
)

var (
// errChecksumMismatch returned when the calculated checksum doesn't match the stored checksum
errChecksumMismatch = errors.New("calculated checksum doesn't match stored checksum")
)

var _ DecoderStream = &decoderStreamWithDigest{}

// decoderStreamWithDigest calculates the digest as it processes a decoder stream.
type decoderStreamWithDigest struct {
reader DecoderStream
readerDigest hash.Hash32
unreadByte bool
enabled bool
singleByteBuf []byte
}

// newDecoderStreamWithDigest returns a new decoderStreamWithDigest
func newDecoderStreamWithDigest(reader DecoderStream) *decoderStreamWithDigest {
return &decoderStreamWithDigest{
reader: reader,
readerDigest: adler32.New(),
singleByteBuf: make([]byte, 1),
}
}

func (d *decoderStreamWithDigest) Read(p []byte) (n int, err error) {
n, err = d.reader.Read(p)
if err != nil {
return n, err
}
if n <= 0 {
return n, nil
}

start := 0
if d.unreadByte {
d.unreadByte = false
start++
}
if d.enabled {
if _, err := d.readerDigest.Write(p[start:n]); err != nil {
return 0, err
}
}
return n, err
}

func (d *decoderStreamWithDigest) ReadByte() (byte, error) {
b, err := d.reader.ReadByte()
if err != nil {
return 0, err
}

if d.unreadByte {
d.unreadByte = false
} else if d.enabled {
d.singleByteBuf[0] = b
if _, err := d.readerDigest.Write(d.singleByteBuf); err != nil {
return b, err
}
}
return b, err
}

func (d *decoderStreamWithDigest) UnreadByte() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, what are the UnreadByte and Capture APIs used for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think UnreadByte is used by Peek-like operations, but I'm not entirely sure. None of the index entry decoding code actually uses it, but I had to implement it so that decoderStreamWithDigest adhered to the DecoderStream interface.

Capture is used so that you can arbitrarily add bytes to the digest. There's a special case in index entry decoding where we pull the desired bytes directly from the backing byte array and then use Seek to advance the reader. When this is done, Capture should be used to update the digest. Here's where that's being done: https://github.com/m3db/m3/pull/2468/files#diff-a4220f1ade552cd435cedf33723b0f81R684

err := d.reader.UnreadByte()
if err == nil {
d.unreadByte = true
}
return err
}

// reset resets the reader for use with a new reader.
func (d *decoderStreamWithDigest) reset(stream DecoderStream) {
d.reader = stream
d.readerDigest.Reset()
}

// digest returns the digest
func (d *decoderStreamWithDigest) digest() hash.Hash32 {
return d.readerDigest
}

// validate compares the current digest against the expected digest and returns
// an error if they don't match.
func (d *decoderStreamWithDigest) validate(expectedDigest uint32) error {
if d.readerDigest.Sum32() != expectedDigest {
return errChecksumMismatch
}
return nil
}

// capture provides a mechanism for manually capturing bytes to add to digest when reader is manipulated
// through atypical means (e.g. reading directly from the backing byte slice of a ByteReader)
func (d *decoderStreamWithDigest) capture(bytes []byte) error {
// No-op if not actually capturing at the moment
if d.enabled {
if _, err := d.readerDigest.Write(bytes); err != nil {
return err
}
}
return nil
}

// setDigestReaderEnabled enables calculating of digest for bytes read. If this is false, behaves like a regular reader.
func (d *decoderStreamWithDigest) setDigestReaderEnabled(enabled bool) {
if !d.enabled && enabled {
d.enabled = true
d.readerDigest.Reset()
} else if d.enabled && !enabled {
d.enabled = false
}
}

// Returns the decoder stream wrapped by this object
func (d *decoderStreamWithDigest) wrappedStream() DecoderStream {
return d.reader
}
Loading