Skip to content

Commit

Permalink
Revert "[dbnode] Validate individual index entries on decode instead …
Browse files Browse the repository at this point in the history
…of entire file on open (#2468)"

This reverts commit bba274f.
  • Loading branch information
robskillington committed Jul 24, 2020
1 parent 82a9d32 commit c945492
Show file tree
Hide file tree
Showing 9 changed files with 11 additions and 550 deletions.
32 changes: 5 additions & 27 deletions src/dbnode/persist/fs/msgpack/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,14 @@ var (

errorUnableToDetermineNumFieldsToSkip = errors.New("unable to determine num fields to skip")
errorCalledDecodeBytesWithoutByteStreamDecoder = errors.New("called decodeBytes with out byte stream decoder")
errorIndexEntryChecksumMismatch = errors.New("decode index entry encountered checksum mismatch")
)

// Decoder decodes persisted msgpack-encoded data
type Decoder struct {
reader DecoderStream
// Will only be set if the Decoder is Reset() with a DecoderStream
// that also implements ByteStream.
byteReader ByteStream
// Wraps original reader with reader that can calculate digest. Digest calculation must be enabled,
// otherwise it defaults to off.
readerWithDigest *decoderStreamWithDigest
byteReader ByteStream
dec *msgpack.Decoder
err error
allocDecodedBytes bool
Expand All @@ -80,7 +76,6 @@ func newDecoder(legacy legacyEncodingOptions, opts DecodingOptions) *Decoder {
reader: reader,
dec: msgpack.NewDecoder(reader),
legacy: legacy,
readerWithDigest: newDecoderStreamWithDigest(nil),
}
}

Expand All @@ -96,8 +91,7 @@ func (dec *Decoder) Reset(stream DecoderStream) {
dec.byteReader = nil
}

dec.readerWithDigest.reset(dec.reader)
dec.dec.Reset(dec.readerWithDigest)
dec.dec.Reset(dec.reader)
dec.err = nil
}

Expand All @@ -121,10 +115,8 @@ func (dec *Decoder) DecodeIndexEntry(bytesPool pool.BytesPool) (schema.IndexEntr
if dec.err != nil {
return emptyIndexEntry, dec.err
}
dec.readerWithDigest.setDigestReaderEnabled(true)
_, numFieldsToSkip := dec.decodeRootObject(indexEntryVersion, indexEntryType)
indexEntry := dec.decodeIndexEntry(bytesPool)
dec.readerWithDigest.setDigestReaderEnabled(false)
dec.skip(numFieldsToSkip)
if dec.err != nil {
return emptyIndexEntry, dec.err
Expand Down Expand Up @@ -414,22 +406,13 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry
return indexEntry
}

// NB(nate): Any new fields should be parsed here.

// Intentionally skip any extra fields here as we've stipulated that from V3 onward, IndexEntryChecksum will be the
// final field on index entries
dec.skip(numFieldsToSkip)

// Retrieve actual checksum value here. Attempting to retrieve after decoding the upcoming expected checksum field
// would include value in actual checksum calculation which would cause a mismatch
actualChecksum := dec.readerWithDigest.digest().Sum32()

// Decode checksum field originally added in V3
expectedChecksum := uint32(dec.decodeVarint())

if expectedChecksum != actualChecksum {
dec.err = errorIndexEntryChecksumMismatch
}
// TODO(nate): actually use the checksum value for index entry validation - #2629
_ = dec.decodeVarint()

return indexEntry
}
Expand Down Expand Up @@ -680,11 +663,6 @@ func (dec *Decoder) decodeBytes() ([]byte, int, int) {
return nil, -1, -1
}
value = backingBytes[currPos:targetPos]
if err := dec.readerWithDigest.capture(value); err != nil {
dec.err = err
return nil, -1, -1
}

return value, currPos, bytesLen
}

Expand All @@ -702,7 +680,7 @@ func (dec *Decoder) decodeBytesWithPool(bytesPool pool.BytesPool) []byte {
}

bytes := bytesPool.Get(bytesLen)[:bytesLen]
n, err := io.ReadFull(dec.readerWithDigest, bytes)
n, err := io.ReadFull(dec.reader, bytes)
if err != nil {
dec.err = err
bytesPool.Put(bytes)
Expand Down
16 changes: 0 additions & 16 deletions src/dbnode/persist/fs/msgpack/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,19 +262,3 @@ func TestDecodeBytesAllocNew(t *testing.T) {
}
require.Equal(t, []byte("testIndexEntry"), res.ID)
}

func TestDecodeIndexEntryInvalidChecksum(t *testing.T) {
var (
enc = NewEncoder()
dec = NewDecoder(nil)
)
require.NoError(t, enc.EncodeIndexEntry(testIndexEntry))

// Update to invalid checksum
enc.buf.Truncate(len(enc.Bytes()) - 5) // 5 bytes = 1 byte for integer code + 4 bytes for checksum
require.NoError(t, enc.enc.EncodeInt64(1234))

dec.Reset(NewByteDecoderStream(enc.Bytes()))
_, err := dec.DecodeIndexEntry(nil)
require.Error(t, err)
}
146 changes: 0 additions & 146 deletions src/dbnode/persist/fs/msgpack/stream_with_digest.go

This file was deleted.

Loading

0 comments on commit c945492

Please sign in to comment.