-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Add IndexEntryChecksum field to entries in index file #2455
Changes from all commits
2135346
c8a9012
e2254ee
b0d48eb
502c1ac
22a4107
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -350,12 +350,25 @@ func (dec *Decoder) decodeIndexBloomFilterInfo() schema.IndexBloomFilterInfo { | |
|
||
func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry { | ||
var opts checkNumFieldsOptions | ||
if dec.legacy.decodeLegacyV1IndexEntry { | ||
switch dec.legacy.decodeLegacyIndexEntryVersion { | ||
case legacyEncodingIndexEntryVersionV1: | ||
// V1 had 5 fields. | ||
opts.override = true | ||
opts.numExpectedMinFields = 5 | ||
opts.numExpectedCurrFields = 5 | ||
case legacyEncodingIndexEntryVersionV2: | ||
// V2 had 6 fields. | ||
opts.override = true | ||
opts.numExpectedMinFields = 5 | ||
opts.numExpectedCurrFields = 6 | ||
case legacyEncodingIndexEntryVersionCurrent: | ||
// V3 is current version, no overrides needed | ||
break | ||
default: | ||
dec.err = fmt.Errorf("invalid legacyEncodingIndexEntryVersion provided: %v", dec.legacy.decodeLegacyIndexEntryVersion) | ||
return emptyIndexEntry | ||
} | ||
|
||
numFieldsToSkip, actual, ok := dec.checkNumFieldsFor(indexEntryType, opts) | ||
if !ok { | ||
return emptyIndexEntry | ||
|
@@ -372,20 +385,35 @@ func (dec *Decoder) decodeIndexEntry(bytesPool pool.BytesPool) schema.IndexEntry | |
|
||
indexEntry.Size = dec.decodeVarint() | ||
indexEntry.Offset = dec.decodeVarint() | ||
indexEntry.Checksum = dec.decodeVarint() | ||
indexEntry.DataChecksum = dec.decodeVarint() | ||
|
||
if dec.legacy.decodeLegacyV1IndexEntry || actual < 6 { | ||
// At this point, if its a V1 file, we've decoded all the available fields. | ||
if dec.legacy.decodeLegacyIndexEntryVersion == legacyEncodingIndexEntryVersionV1 || actual < 6 { | ||
dec.skip(numFieldsToSkip) | ||
return indexEntry | ||
} | ||
|
||
// Decode fields added in V2 | ||
if bytesPool == nil { | ||
indexEntry.EncodedTags, _, _ = dec.decodeBytes() | ||
} else { | ||
indexEntry.EncodedTags = dec.decodeBytesWithPool(bytesPool) | ||
} | ||
|
||
// At this point, if its a V2 file, we've decoded all the available fields. | ||
if dec.legacy.decodeLegacyIndexEntryVersion == legacyEncodingIndexEntryVersionV2 || actual < 7 { | ||
dec.skip(numFieldsToSkip) | ||
return indexEntry | ||
} | ||
|
||
// Intentionally skip any extra fields here as we've stipulated that from V3 onward, IndexEntryChecksum will be the | ||
// final field on index entries | ||
dec.skip(numFieldsToSkip) | ||
|
||
// Decode checksum field originally added in V3 | ||
// TODO(nate): actually use the checksum value for index entry validation - #2629 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolved in #2468 |
||
_ = dec.decodeVarint() | ||
|
||
return indexEntry | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ package msgpack | |
import ( | ||
"bytes" | ||
|
||
"github.com/m3db/m3/src/dbnode/digest" | ||
"github.com/m3db/m3/src/dbnode/persist/schema" | ||
|
||
"gopkg.in/vmihailenco/msgpack.v2" | ||
|
@@ -64,20 +65,29 @@ const ( | |
legacyEncodingIndexVersionV5 | ||
) | ||
|
||
type legacyEncodingIndexEntryVersion int | ||
|
||
const ( | ||
legacyEncodingIndexEntryVersionCurrent = legacyEncodingIndexEntryVersionV3 | ||
legacyEncodingIndexEntryVersionV1 legacyEncodingIndexEntryVersion = iota | ||
legacyEncodingIndexEntryVersionV2 | ||
legacyEncodingIndexEntryVersionV3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yay no bools for this anymore 👍 |
||
) | ||
|
||
type legacyEncodingOptions struct { | ||
encodeLegacyIndexInfoVersion legacyEncodingIndexInfoVersion | ||
decodeLegacyIndexInfoVersion legacyEncodingIndexInfoVersion | ||
|
||
encodeLegacyV1IndexEntry bool | ||
decodeLegacyV1IndexEntry bool | ||
encodeLegacyIndexEntryVersion legacyEncodingIndexEntryVersion | ||
decodeLegacyIndexEntryVersion legacyEncodingIndexEntryVersion | ||
} | ||
|
||
var defaultlegacyEncodingOptions = legacyEncodingOptions{ | ||
encodeLegacyIndexInfoVersion: legacyEncodingIndexVersionCurrent, | ||
decodeLegacyIndexInfoVersion: legacyEncodingIndexVersionCurrent, | ||
|
||
encodeLegacyV1IndexEntry: false, | ||
decodeLegacyV1IndexEntry: false, | ||
encodeLegacyIndexEntryVersion: legacyEncodingIndexEntryVersionCurrent, | ||
decodeLegacyIndexEntryVersion: legacyEncodingIndexEntryVersionCurrent, | ||
} | ||
|
||
// NewEncoder creates a new encoder. | ||
|
@@ -100,7 +110,8 @@ func newEncoder(legacy legacyEncodingOptions) *Encoder { | |
enc.encodeBytesFn = enc.encodeBytes | ||
enc.encodeArrayLenFn = enc.encodeArrayLen | ||
|
||
// Used primarily for testing. | ||
// Used primarily for testing however legitimate production uses exist (e.g. addition of IndexEntryChecksum in | ||
// IndexEntryV3) | ||
enc.legacy = legacy | ||
|
||
return enc | ||
|
@@ -141,11 +152,19 @@ func (enc *Encoder) EncodeIndexEntry(entry schema.IndexEntry) error { | |
if enc.err != nil { | ||
return enc.err | ||
} | ||
|
||
// There's no guarantee EncodeIndexEntry is called with an empty buffer so ensure | ||
// only checksumming the bits we care about. | ||
checksumStart := enc.buf.Len() | ||
|
||
enc.encodeRootObject(indexEntryVersion, indexEntryType) | ||
if enc.legacy.encodeLegacyV1IndexEntry { | ||
switch enc.legacy.encodeLegacyIndexEntryVersion { | ||
case legacyEncodingIndexEntryVersionV1: | ||
enc.encodeIndexEntryV1(entry) | ||
} else { | ||
case legacyEncodingIndexEntryVersionV2: | ||
enc.encodeIndexEntryV2(entry) | ||
default: | ||
enc.encodeIndexEntryV3(entry, checksumStart) | ||
} | ||
return enc.err | ||
} | ||
|
@@ -283,17 +302,30 @@ func (enc *Encoder) encodeIndexEntryV1(entry schema.IndexEntry) { | |
enc.encodeBytesFn(entry.ID) | ||
enc.encodeVarintFn(entry.Size) | ||
enc.encodeVarintFn(entry.Offset) | ||
enc.encodeVarintFn(entry.Checksum) | ||
enc.encodeVarintFn(entry.DataChecksum) | ||
} | ||
|
||
func (enc *Encoder) encodeIndexEntryV2(entry schema.IndexEntry) { | ||
enc.encodeArrayLenFn(6) // V2 had 6 fields. | ||
enc.encodeVarintFn(entry.Index) | ||
enc.encodeBytesFn(entry.ID) | ||
enc.encodeVarintFn(entry.Size) | ||
enc.encodeVarintFn(entry.Offset) | ||
enc.encodeVarintFn(entry.DataChecksum) | ||
enc.encodeBytesFn(entry.EncodedTags) | ||
} | ||
|
||
func (enc *Encoder) encodeIndexEntryV3(entry schema.IndexEntry, checksumStart int) { | ||
enc.encodeNumObjectFieldsForFn(indexEntryType) | ||
enc.encodeVarintFn(entry.Index) | ||
enc.encodeBytesFn(entry.ID) | ||
enc.encodeVarintFn(entry.Size) | ||
enc.encodeVarintFn(entry.Offset) | ||
enc.encodeVarintFn(entry.Checksum) | ||
enc.encodeVarintFn(entry.DataChecksum) | ||
enc.encodeBytesFn(entry.EncodedTags) | ||
|
||
checksum := digest.Checksum(enc.Bytes()[checksumStart:]) | ||
enc.encodeVarintFn(int64(checksum)) | ||
} | ||
|
||
func (enc *Encoder) encodeIndexSummary(summary schema.IndexSummary) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,19 +40,33 @@ func testCapturingEncoder(t *testing.T) (*Encoder, *[]interface{}) { | |
encoder := NewEncoder() | ||
|
||
var result []interface{} | ||
actualEncodeVarintFn := encoder.encodeVarintFn | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Capturing encoder previously captured result but didn't write through to buffer. Since the buffer is needed to calculate the checksum, ensure original method is called as well |
||
encoder.encodeVarintFn = func(value int64) { | ||
actualEncodeVarintFn(value) | ||
result = append(result, value) | ||
} | ||
|
||
actualEncodeVarUintFn := encoder.encodeVarUintFn | ||
encoder.encodeVarUintFn = func(value uint64) { | ||
actualEncodeVarUintFn(value) | ||
result = append(result, value) | ||
} | ||
|
||
actualEncodeFloat64Fn := encoder.encodeFloat64Fn | ||
encoder.encodeFloat64Fn = func(value float64) { | ||
actualEncodeFloat64Fn(value) | ||
result = append(result, value) | ||
} | ||
|
||
actualEncodeBytesFn := encoder.encodeBytesFn | ||
encoder.encodeBytesFn = func(value []byte) { | ||
actualEncodeBytesFn(value) | ||
result = append(result, value) | ||
} | ||
|
||
actualEncodeArrayLenFn := encoder.encodeArrayLenFn | ||
encoder.encodeArrayLenFn = func(value int) { | ||
actualEncodeArrayLenFn(value) | ||
result = append(result, value) | ||
} | ||
|
||
|
@@ -98,8 +112,9 @@ func testExpectedResultForIndexEntry(t *testing.T, indexEntry schema.IndexEntry) | |
indexEntry.ID, | ||
indexEntry.Size, | ||
indexEntry.Offset, | ||
indexEntry.Checksum, | ||
indexEntry.DataChecksum, | ||
indexEntry.EncodedTags, | ||
testIndexEntryChecksum, // Checksum auto-added to the end of the index entry | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may want to error out on unknown case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and I guess add a
case legacyEncodingIndexEntryVersionV3: break
noop)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be misunderstanding the flow here if that's not a valid case though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update this flow to make more clear