diff --git a/src/dbnode/encoding/encoding_mock.go b/src/dbnode/encoding/encoding_mock.go index 201a89c74c..efac523172 100644 --- a/src/dbnode/encoding/encoding_mock.go +++ b/src/dbnode/encoding/encoding_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/encoding/types.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -296,7 +296,7 @@ func (mr *MockOptionsMockRecorder) TimeEncodingSchemes() *gomock.Call { } // SetMarkerEncodingScheme mocks base method -func (m *MockOptions) SetMarkerEncodingScheme(value MarkerEncodingScheme) Options { +func (m *MockOptions) SetMarkerEncodingScheme(value *MarkerEncodingScheme) Options { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SetMarkerEncodingScheme", value) ret0, _ := ret[0].(Options) @@ -310,10 +310,10 @@ func (mr *MockOptionsMockRecorder) SetMarkerEncodingScheme(value interface{}) *g } // MarkerEncodingScheme mocks base method -func (m *MockOptions) MarkerEncodingScheme() MarkerEncodingScheme { +func (m *MockOptions) MarkerEncodingScheme() *MarkerEncodingScheme { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MarkerEncodingScheme") - ret0, _ := ret[0].(MarkerEncodingScheme) + ret0, _ := ret[0].(*MarkerEncodingScheme) return ret0 } diff --git a/src/dbnode/encoding/istream.go b/src/dbnode/encoding/istream.go index 1b2355ca9f..c24590dbd6 100644 --- a/src/dbnode/encoding/istream.go +++ b/src/dbnode/encoding/istream.go @@ -29,12 +29,16 @@ import ( // IStream encapsulates a readable stream. type IStream struct { r xio.Reader64 - err error // error encountered current uint64 // current uint64 we are working off of index int // current index within data slice remaining uint8 // bits remaining in current to be read } +var ( + _ io.ByteReader = (*IStream)(nil) + _ io.Reader = (*IStream)(nil) +) + // NewIStream creates a new IStream func NewIStream(reader64 xio.Reader64) *IStream { return &IStream{r: reader64} @@ -67,23 +71,30 @@ func (is *IStream) ReadBit() (Bit, error) { // ReadBits reads the next Bits. func (is *IStream) ReadBits(numBits uint8) (uint64, error) { - if is.err != nil { - return 0, is.err - } - if numBits <= is.remaining { + res := is.current >> (64 - numBits) + remaining := is.remaining + if numBits <= remaining { // Have enough bits buffered. - return is.consumeBuffer(numBits), nil + is.current <<= numBits + is.remaining -= numBits + return res, nil } - res := readBitsInWord(is.current, numBits) + // Not enough bits buffered, read next word from the stream. - bitsNeeded := numBits - is.remaining - if err := is.readWordFromStream(); err != nil { + bitsNeeded := numBits - remaining + + current, n, err := is.r.Read64() + if err != nil { return 0, err } - if is.remaining < bitsNeeded { + n *= 8 + if n < bitsNeeded { return 0, io.EOF } - return res | is.consumeBuffer(bitsNeeded), nil + + is.current = current << bitsNeeded + is.remaining = n - bitsNeeded + return res | current>>(64-bitsNeeded), nil } // PeekBits looks at the next Bits, but doesn't move the pos. @@ -113,26 +124,8 @@ func readBitsInWord(w uint64, numBits uint8) uint64 { return w >> (64 - numBits) } -// consumeBuffer consumes numBits in is.current. -func (is *IStream) consumeBuffer(numBits uint8) uint64 { - res := readBitsInWord(is.current, numBits) - is.current <<= numBits - is.remaining -= numBits - return res -} - -func (is *IStream) readWordFromStream() error { - current, bytes, err := is.r.Read64() - is.current = current - is.remaining = 8 * bytes - is.err = err - - return err -} - // Reset resets the IStream. func (is *IStream) Reset(reader xio.Reader64) { - is.err = nil is.current = 0 is.remaining = 0 is.index = 0 diff --git a/src/dbnode/encoding/istream_test.go b/src/dbnode/encoding/istream_test.go index eddc4dd30d..245ebaf990 100644 --- a/src/dbnode/encoding/istream_test.go +++ b/src/dbnode/encoding/istream_test.go @@ -183,5 +183,4 @@ func TestIStreamReset(t *testing.T) { require.Equal(t, uint64(0), is.current) require.Equal(t, uint8(0), is.remaining) require.Equal(t, 0, is.index) - require.NoError(t, is.err) } diff --git a/src/dbnode/encoding/m3tsz/encoder.go b/src/dbnode/encoding/m3tsz/encoder.go index 080c29bfc1..ac9d9f74d8 100644 --- a/src/dbnode/encoding/m3tsz/encoder.go +++ b/src/dbnode/encoding/m3tsz/encoder.go @@ -41,8 +41,9 @@ var ( // encoder is an M3TSZ encoder that can encode a stream of data in M3TSZ format. type encoder struct { - os encoding.OStream - opts encoding.Options + os encoding.OStream + opts encoding.Options + markerEncodingScheme *encoding.MarkerEncodingScheme // internal bookkeeping tsEncoderState TimestampEncoder @@ -75,11 +76,12 @@ func NewEncoder( // `Reset` method is called. initAllocIfEmpty := opts.EncoderPool() == nil return &encoder{ - os: encoding.NewOStream(bytes, initAllocIfEmpty, opts.BytesPool()), - opts: opts, - tsEncoderState: NewTimestampEncoder(start, opts.DefaultTimeUnit(), opts), - closed: false, - intOptimized: intOptimized, + os: encoding.NewOStream(bytes, initAllocIfEmpty, opts.BytesPool()), + opts: opts, + markerEncodingScheme: opts.MarkerEncodingScheme(), + tsEncoderState: NewTimestampEncoder(start, opts.DefaultTimeUnit(), opts), + closed: false, + intOptimized: intOptimized, } } @@ -334,7 +336,7 @@ func (enc *encoder) Len() int { var ( lastIdx = len(raw) - 1 lastByte = raw[lastIdx] - scheme = enc.opts.MarkerEncodingScheme() + scheme = enc.markerEncodingScheme tail = scheme.Tail(lastByte, pos) ) tail.IncRef() @@ -406,7 +408,7 @@ func (enc *encoder) segmentZeroCopy(ctx context.Context) ts.Segment { ctx.RegisterCloser(buffer.DelayFinalizer()) // Take a shared ref to a known good tail. - scheme := enc.opts.MarkerEncodingScheme() + scheme := enc.markerEncodingScheme tail := scheme.Tail(lastByte, pos) // NB(r): Finalize the head bytes whether this is by ref or copy. If by @@ -434,7 +436,7 @@ func (enc *encoder) segmentTakeOwnership() ts.Segment { head.DecRef() // Take a shared ref to a known good tail. - scheme := enc.opts.MarkerEncodingScheme() + scheme := enc.markerEncodingScheme tail := scheme.Tail(lastByte, pos) // NB(r): Finalize the head bytes whether this is by ref or copy. If by diff --git a/src/dbnode/encoding/m3tsz/encoder_test.go b/src/dbnode/encoding/m3tsz/encoder_test.go index 388855aecc..503dae6d07 100644 --- a/src/dbnode/encoding/m3tsz/encoder_test.go +++ b/src/dbnode/encoding/m3tsz/encoder_test.go @@ -92,7 +92,7 @@ func TestWriteDeltaOfDeltaTimeUnitChanged(t *testing.T) { } for _, input := range inputs { stream := encoding.NewOStream(nil, false, nil) - tsEncoder := NewTimestampEncoder(testStartTime, xtime.Nanosecond, nil) + tsEncoder := NewTimestampEncoder(testStartTime, xtime.Nanosecond, encoding.NewOptions()) tsEncoder.writeDeltaOfDeltaTimeUnitChanged(stream, 0, input.delta) b, p := stream.RawBytes() require.Equal(t, input.expectedBytes, b) diff --git a/src/dbnode/encoding/m3tsz/iterator.go b/src/dbnode/encoding/m3tsz/iterator.go index 22a1be963a..f95d2ac379 100644 --- a/src/dbnode/encoding/m3tsz/iterator.go +++ b/src/dbnode/encoding/m3tsz/iterator.go @@ -21,6 +21,7 @@ package m3tsz import ( + "errors" "math" "github.com/m3db/m3/src/dbnode/encoding" @@ -30,6 +31,8 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) +var errClosed = errors.New("iterator is closed") + // DefaultReaderIteratorAllocFn returns a function for allocating NewReaderIterator. func DefaultReaderIteratorAllocFn( opts encoding.Options, @@ -85,17 +88,13 @@ func (it *readerIterator) Next() bool { return false } - it.readValue(first) - - return it.hasNext() -} - -func (it *readerIterator) readValue(first bool) { - if first { - it.readFirstValue() - } else { + if !first { it.readNextValue() + } else { + it.readFirstValue() } + + return it.hasNext() } func (it *readerIterator) readFirstValue() { @@ -150,9 +149,18 @@ func (it *readerIterator) readNextValue() { if err := it.floatIter.readNextFloat(it.is); err != nil { it.err = err } - } else { - it.readIntValDiff() + return } + + // inlined readIntValDiff() + bits := it.readBits(it.sig + 1) + sign := -1.0 + if (bits >> it.sig) == opcodeNegative { + sign = 1.0 + // clear the opcode bit + bits ^= uint64(1 << it.sig) + } + it.intVal += sign * float64(bits) } func (it *readerIterator) readIntSigMult() { @@ -173,40 +181,38 @@ func (it *readerIterator) readIntSigMult() { } func (it *readerIterator) readIntValDiff() { + // read both sign bit and digits in one read + bits := it.readBits(it.sig + 1) sign := -1.0 - if it.readBits(1) == opcodeNegative { + if (bits >> it.sig) == opcodeNegative { sign = 1.0 + // clear the opcode bit + bits ^= uint64(1 << it.sig) } - - it.intVal += sign * float64(it.readBits(it.sig)) + it.intVal += sign * float64(bits) } -func (it *readerIterator) readBits(numBits uint8) uint64 { - if !it.hasNext() { - return 0 - } - var res uint64 +func (it *readerIterator) readBits(numBits uint8) (res uint64) { res, it.err = it.is.ReadBits(numBits) - return res + return } // Current returns the value as well as the annotation associated with the current datapoint. // Users should not hold on to the returned Annotation object as it may get invalidated when // the iterator calls Next(). func (it *readerIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) { + dp := ts.Datapoint{ + Timestamp: it.tsIterator.PrevTime.ToTime(), + TimestampNanos: it.tsIterator.PrevTime, + } + if !it.intOptimized || it.isFloat { - return ts.Datapoint{ - Timestamp: it.tsIterator.PrevTime.ToTime(), - TimestampNanos: it.tsIterator.PrevTime, - Value: math.Float64frombits(it.floatIter.PrevFloatBits), - }, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt + dp.Value = math.Float64frombits(it.floatIter.PrevFloatBits) + } else { + dp.Value = convertFromIntFloat(it.intVal, it.mult) } - return ts.Datapoint{ - Timestamp: it.tsIterator.PrevTime.ToTime(), - TimestampNanos: it.tsIterator.PrevTime, - Value: convertFromIntFloat(it.intVal, it.mult), - }, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt + return dp, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt } // Err returns the error encountered @@ -227,7 +233,7 @@ func (it *readerIterator) isClosed() bool { } func (it *readerIterator) hasNext() bool { - return !it.hasError() && !it.isDone() && !it.isClosed() + return !it.hasError() && !it.isDone() } // Reset resets the ReadIterator for reuse. @@ -249,6 +255,7 @@ func (it *readerIterator) Close() { } it.closed = true + it.err = errClosed pool := it.opts.ReaderIteratorPool() if pool != nil { pool.Put(it) diff --git a/src/dbnode/encoding/m3tsz/iterator_test.go b/src/dbnode/encoding/m3tsz/iterator_test.go index d2a28dbc0d..b561d727b2 100644 --- a/src/dbnode/encoding/m3tsz/iterator_test.go +++ b/src/dbnode/encoding/m3tsz/iterator_test.go @@ -67,6 +67,8 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) { it.TimeUnit = input.timeUnit it.PrevTimeDelta = input.previousTimeDelta + tes, _ := it.timeEncodingSchemes.SchemeForUnit(it.TimeUnit) + it.timeEncodingScheme = tes err := it.readNextTimestamp(stream) require.NoError(t, err) @@ -75,7 +77,10 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) { stream := encoding.NewIStream(xio.NewBytesReader64([]byte{0x1})) it := NewTimestampIterator(encoding.NewOptions(), false) - err := it.readNextTimestamp(stream) + err := it.readFirstTimestamp(stream) + require.Error(t, err) + + err = it.readNextTimestamp(stream) require.Error(t, err) err = it.readNextTimestamp(stream) diff --git a/src/dbnode/encoding/m3tsz/timestamp_encoder.go b/src/dbnode/encoding/m3tsz/timestamp_encoder.go index 34228b06be..86e13be7e8 100644 --- a/src/dbnode/encoding/m3tsz/timestamp_encoder.go +++ b/src/dbnode/encoding/m3tsz/timestamp_encoder.go @@ -39,10 +39,11 @@ type TimestampEncoder struct { PrevTimeDelta time.Duration PrevAnnotationChecksum uint64 - Options encoding.Options - TimeUnit xtime.Unit + markerEncodingScheme *encoding.MarkerEncodingScheme + timeEncodingSchemes encoding.TimeEncodingSchemes + // Used to keep track of time unit changes that occur directly via the WriteTimeUnit() // API as opposed to indirectly via the WriteTime() API. timeUnitEncodedManually bool @@ -58,8 +59,9 @@ func NewTimestampEncoder( return TimestampEncoder{ PrevTime: start, TimeUnit: initialTimeUnit(xtime.ToUnixNano(start), timeUnit), - Options: opts, PrevAnnotationChecksum: emptyAnnotationChecksum, + markerEncodingScheme: opts.MarkerEncodingScheme(), + timeEncodingSchemes: opts.TimeEncodingSchemes(), } } @@ -126,7 +128,7 @@ func (enc *TimestampEncoder) maybeWriteTimeUnitChange(stream encoding.OStream, t return false } - scheme := enc.Options.MarkerEncodingScheme() + scheme := enc.markerEncodingScheme encoding.WriteSpecialMarker(stream, scheme, scheme.TimeUnit()) enc.WriteTimeUnit(stream, timeUnit) return true @@ -158,7 +160,7 @@ func (enc *TimestampEncoder) writeAnnotation(stream encoding.OStream, ant ts.Ann return } - scheme := enc.Options.MarkerEncodingScheme() + scheme := enc.markerEncodingScheme encoding.WriteSpecialMarker(stream, scheme, scheme.Annotation()) var buf [binary.MaxVarintLen32]byte @@ -197,9 +199,9 @@ func (enc *TimestampEncoder) writeDeltaOfDeltaTimeUnitUnchanged( } } - tes, exists := enc.Options.TimeEncodingSchemes().SchemeForUnit(timeUnit) + tes, exists := enc.timeEncodingSchemes.SchemeForUnit(timeUnit) if !exists { - return fmt.Errorf("time encoding scheme for time unit %v doesn't exist", timeUnit) + return errNoTimeSchemaForUnit } if deltaOfDelta == 0 { diff --git a/src/dbnode/encoding/m3tsz/timestamp_iterator.go b/src/dbnode/encoding/m3tsz/timestamp_iterator.go index 587e03e274..de6dbe0b70 100644 --- a/src/dbnode/encoding/m3tsz/timestamp_iterator.go +++ b/src/dbnode/encoding/m3tsz/timestamp_iterator.go @@ -22,7 +22,7 @@ package m3tsz import ( "encoding/binary" - "fmt" + "errors" "time" "github.com/m3db/m3/src/dbnode/encoding" @@ -30,6 +30,12 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) +var ( + errNoTimeSchemaForUnit = errors.New("time encoding scheme doesn't exist for unit") + errUnexpectedAnnotationLength = errors.New("expected annotation length to be >= 0") + errAnnotationTooFewBytes = errors.New("expected to read annotation bytes, but got end of stream") +) + // TimestampIterator encapsulates all the state required for iterating over // delta-of-delta compressed timestamps. type TimestampIterator struct { @@ -38,11 +44,12 @@ type TimestampIterator struct { PrevAnt ts.Annotation prevAntBytes [ts.OptimizedAnnotationLen]byte - TimeUnit xtime.Unit - - Opts encoding.Options + TimeUnit xtime.Unit + defaultTimeUnit xtime.Unit - markerEncodingScheme encoding.MarkerEncodingScheme + markerEncodingScheme *encoding.MarkerEncodingScheme + timeEncodingSchemes encoding.TimeEncodingSchemes + timeEncodingScheme *encoding.TimeEncodingScheme TimeUnitChanged bool Done bool @@ -60,11 +67,12 @@ type TimestampIterator struct { func NewTimestampIterator(opts encoding.Options, skipMarkers bool) TimestampIterator { mes := opts.MarkerEncodingScheme() return TimestampIterator{ - Opts: opts, + defaultTimeUnit: opts.DefaultTimeUnit(), SkipMarkers: skipMarkers, numValueBits: uint8(mes.NumValueBits()), numBits: uint8(mes.NumOpcodeBits() + mes.NumValueBits()), markerEncodingScheme: mes, + timeEncodingSchemes: opts.TimeEncodingSchemes(), } } @@ -74,14 +82,22 @@ func (it *TimestampIterator) ReadTimestamp(stream *encoding.IStream) (bool, bool var ( first = false + dod time.Duration err error ) - if it.PrevTime == 0 { + + if it.PrevTime != 0 { + // inlined readNextTimestamp + dod, err = it.readMarkerOrDeltaOfDelta(stream) + if err == nil { + it.PrevTimeDelta += dod + it.PrevTime += xtime.UnixNano(it.PrevTimeDelta) + } + } else { first = true err = it.readFirstTimestamp(stream) - } else { - err = it.readNextTimestamp(stream) } + if err != nil { return false, false, err } @@ -100,7 +116,7 @@ func (it *TimestampIterator) ReadTimestamp(stream *encoding.IStream) (bool, bool // accordingly. It is exposed as a public method so that callers can control // the encoding / decoding of the time unit on their own if they choose. func (it *TimestampIterator) ReadTimeUnit(stream *encoding.IStream) error { - tuBits, err := stream.ReadByte() + tuBits, err := stream.ReadBits(8) if err != nil { return err } @@ -108,6 +124,10 @@ func (it *TimestampIterator) ReadTimeUnit(stream *encoding.IStream) error { tu := xtime.Unit(tuBits) if tu.IsValid() && tu != it.TimeUnit { it.TimeUnitChanged = true + tes, ok := it.timeEncodingSchemes.SchemeForUnit(tu) + if ok { + it.timeEncodingScheme = tes + } } it.TimeUnit = tu @@ -123,7 +143,12 @@ func (it *TimestampIterator) readFirstTimestamp(stream *encoding.IStream) error // NB(xichen): first time stamp is always normalized to nanoseconds. nt := xtime.UnixNano(ntBits) if it.TimeUnit == xtime.None { - it.TimeUnit = initialTimeUnit(nt, it.Opts.DefaultTimeUnit()) + it.TimeUnit = initialTimeUnit(nt, it.defaultTimeUnit) + } + + tes, ok := it.timeEncodingSchemes.SchemeForUnit(it.TimeUnit) + if ok { + it.timeEncodingScheme = tes } err = it.readNextTimestamp(stream) @@ -148,30 +173,36 @@ func (it *TimestampIterator) readNextTimestamp(stream *encoding.IStream) error { // nolint: gocyclo func (it *TimestampIterator) tryReadMarker(stream *encoding.IStream) (time.Duration, bool, error) { - opcodeAndValue, success := it.tryPeekBits(stream, it.numBits) - if !success { + var ( + numBits = it.numBits + numValueBits = it.numValueBits + opcodeAndValue, err = stream.PeekBits(numBits) + ) + + if err != nil { return 0, false, nil } - opcode := opcodeAndValue >> it.numValueBits + opcode := opcodeAndValue >> numValueBits if opcode != it.markerEncodingScheme.Opcode() { return 0, false, nil } var ( - valueMask = (1 << it.numValueBits) - 1 - markerValue = int64(opcodeAndValue & uint64(valueMask)) + valueMask = (1 << numValueBits) - 1 + markerValue = encoding.Marker(opcodeAndValue & uint64(valueMask)) ) - switch encoding.Marker(markerValue) { + + switch markerValue { case it.markerEncodingScheme.EndOfStream(): - _, err := stream.ReadBits(it.numBits) + _, err := stream.ReadBits(numBits) if err != nil { return 0, false, err } it.Done = true return 0, true, nil case it.markerEncodingScheme.Annotation(): - _, err := stream.ReadBits(it.numBits) + _, err := stream.ReadBits(numBits) if err != nil { return 0, false, err } @@ -185,7 +216,7 @@ func (it *TimestampIterator) tryReadMarker(stream *encoding.IStream) (time.Durat } return markerOrDOD, true, nil case it.markerEncodingScheme.TimeUnit(): - _, err := stream.ReadBits(it.numBits) + _, err := stream.ReadBits(numBits) if err != nil { return 0, false, err } @@ -208,46 +239,29 @@ func (it *TimestampIterator) readMarkerOrDeltaOfDelta( ) (time.Duration, error) { if !it.SkipMarkers { dod, success, err := it.tryReadMarker(stream) - if err != nil { - return 0, err - } - if it.Done { - return 0, nil - } - - if success { - return dod, nil + if success || err != nil || it.Done { + return dod, err } } - tes, exists := it.Opts.TimeEncodingSchemes().SchemeForUnit(it.TimeUnit) - if !exists { - return 0, fmt.Errorf("time encoding scheme for time unit %v doesn't exist", it.TimeUnit) - } - - return it.readDeltaOfDelta(stream, tes) + return it.readDeltaOfDelta(stream) } func (it *TimestampIterator) readDeltaOfDelta( stream *encoding.IStream, - tes encoding.TimeEncodingScheme, ) (time.Duration, error) { if it.TimeUnitChanged { - // NB(xichen): if the time unit has changed, always read 64 bits as normalized - // dod in nanoseconds. - dodBits, err := stream.ReadBits(64) - if err != nil { - return 0, err - } - - dod := encoding.SignExtend(dodBits, 64) - return time.Duration(dod), nil + return it.readFullTimestamp(stream) + } else if it.timeEncodingScheme == nil { + return 0, errNoTimeSchemaForUnit } cb, err := stream.ReadBits(1) if err != nil { return 0, err } + + tes := it.timeEncodingScheme if cb == tes.ZeroBucket().Opcode() { return 0, nil } @@ -290,6 +304,26 @@ func (it *TimestampIterator) readDeltaOfDelta( return xtime.FromNormalizedDuration(dod, timeUnit), nil } +func (it *TimestampIterator) readFullTimestamp( + stream *encoding.IStream, +) (time.Duration, error) { + tes, exists := it.timeEncodingSchemes.SchemeForUnit(it.TimeUnit) + if !exists { + return 0, errNoTimeSchemaForUnit + } + it.timeEncodingScheme = tes + // NB(xichen): if the time unit has changed, always read 64 bits as normalized + // dod in nanoseconds. + dodBits, err := stream.ReadBits(64) + if err != nil { + return 0, err + } + + dod := encoding.SignExtend(dodBits, 64) + + return time.Duration(dod), nil +} + func (it *TimestampIterator) readAnnotation(stream *encoding.IStream) error { antLen, err := it.readVarint(stream) if err != nil { @@ -299,7 +333,7 @@ func (it *TimestampIterator) readAnnotation(stream *encoding.IStream) error { // NB: we add 1 here to offset the 1 we subtracted during encoding. antLen = antLen + 1 if antLen <= 0 { - return fmt.Errorf("unexpected annotation length %d", antLen) + return errUnexpectedAnnotationLength } var buf []byte @@ -314,9 +348,7 @@ func (it *TimestampIterator) readAnnotation(stream *encoding.IStream) error { return err } if n != antLen { - return fmt.Errorf( - "expected to read %d annotation bytes but read: %d", - antLen, n) + return errAnnotationTooFewBytes } it.PrevAnt = buf @@ -327,11 +359,3 @@ func (it *TimestampIterator) readVarint(stream *encoding.IStream) (int, error) { res, err := binary.ReadVarint(stream) return int(res), err } - -func (it *TimestampIterator) tryPeekBits(stream *encoding.IStream, numBits uint8) (uint64, bool) { - res, err := stream.PeekBits(numBits) - if err != nil { - return 0, false - } - return res, true -} diff --git a/src/dbnode/encoding/options.go b/src/dbnode/encoding/options.go index 1016be3d53..2168188327 100644 --- a/src/dbnode/encoding/options.go +++ b/src/dbnode/encoding/options.go @@ -42,7 +42,7 @@ var ( type options struct { defaultTimeUnit xtime.Unit timeEncodingSchemes TimeEncodingSchemes - markerEncodingScheme MarkerEncodingScheme + markerEncodingScheme *MarkerEncodingScheme encoderPool EncoderPool readerIteratorPool ReaderIteratorPool bytesPool pool.CheckedBytesPool @@ -56,7 +56,7 @@ type options struct { func newOptions() Options { return &options{ defaultTimeUnit: defaultDefaultTimeUnit, - timeEncodingSchemes: newTimeEncodingSchemes(defaultTimeEncodingSchemes), + timeEncodingSchemes: NewTimeEncodingSchemes(defaultTimeEncodingSchemes), markerEncodingScheme: defaultMarkerEncodingScheme, byteFieldDictLRUSize: defaultByteFieldDictLRUSize, iStreamReaderSizeM3TSZ: defaultIStreamReaderSizeM3TSZ, @@ -81,7 +81,7 @@ func (o *options) DefaultTimeUnit() xtime.Unit { func (o *options) SetTimeEncodingSchemes(value map[xtime.Unit]TimeEncodingScheme) Options { opts := *o - opts.timeEncodingSchemes = newTimeEncodingSchemes(value) + opts.timeEncodingSchemes = NewTimeEncodingSchemes(value) return &opts } @@ -89,13 +89,13 @@ func (o *options) TimeEncodingSchemes() TimeEncodingSchemes { return o.timeEncodingSchemes } -func (o *options) SetMarkerEncodingScheme(value MarkerEncodingScheme) Options { +func (o *options) SetMarkerEncodingScheme(value *MarkerEncodingScheme) Options { opts := *o opts.markerEncodingScheme = value return &opts } -func (o *options) MarkerEncodingScheme() MarkerEncodingScheme { +func (o *options) MarkerEncodingScheme() *MarkerEncodingScheme { return o.markerEncodingScheme } diff --git a/src/dbnode/encoding/ostream.go b/src/dbnode/encoding/ostream.go index 3baeb4f2de..df84422466 100644 --- a/src/dbnode/encoding/ostream.go +++ b/src/dbnode/encoding/ostream.go @@ -188,15 +188,34 @@ func (os *ostream) WriteBits(v uint64, numBits int) { } v <<= uint(64 - numBits) + + for numBits >= 32 { + os.WriteByte(byte(v >> 56)) + os.WriteByte(byte(v >> 48)) + os.WriteByte(byte(v >> 40)) + os.WriteByte(byte(v >> 32)) + + v <<= 32 + numBits -= 32 + } + for numBits >= 8 { os.WriteByte(byte(v >> 56)) v <<= 8 numBits -= 8 } + remainder := byte(v >> 56) for numBits > 0 { - os.WriteBit(Bit((v >> 63) & 1)) - v <<= 1 + val := remainder & 0x80 + // inlined WriteBit + if os.hasUnusedBits() { + os.fillUnused(val) + os.pos++ + } else { + os.grow(val, 1) + } + remainder <<= 1 numBits-- } } diff --git a/src/dbnode/encoding/scheme.go b/src/dbnode/encoding/scheme.go index 0c8327aea8..c1dc096952 100644 --- a/src/dbnode/encoding/scheme.go +++ b/src/dbnode/encoding/scheme.go @@ -39,20 +39,20 @@ const ( var ( // default time encoding schemes - defaultZeroBucket = newTimeBucket(0x0, 1, 0) + defaultZeroBucket = NewTimeBucket(0x0, 1, 0) defaultNumValueBitsForBuckets = []int{7, 9, 12} // TODO(xichen): set more reasonable defaults once we have more knowledge // of the use cases for time units other than seconds. defaultTimeEncodingSchemes = map[xtime.Unit]TimeEncodingScheme{ - xtime.Second: newTimeEncodingScheme(defaultNumValueBitsForBuckets, 32), - xtime.Millisecond: newTimeEncodingScheme(defaultNumValueBitsForBuckets, 32), - xtime.Microsecond: newTimeEncodingScheme(defaultNumValueBitsForBuckets, 64), - xtime.Nanosecond: newTimeEncodingScheme(defaultNumValueBitsForBuckets, 64), + xtime.Second: NewTimeEncodingScheme(defaultNumValueBitsForBuckets, 32), + xtime.Millisecond: NewTimeEncodingScheme(defaultNumValueBitsForBuckets, 32), + xtime.Microsecond: NewTimeEncodingScheme(defaultNumValueBitsForBuckets, 64), + xtime.Nanosecond: NewTimeEncodingScheme(defaultNumValueBitsForBuckets, 64), } // default marker encoding scheme - defaultMarkerEncodingScheme = newMarkerEncodingScheme( + defaultMarkerEncodingScheme = NewMarkerEncodingScheme( defaultMarkerOpcode, defaultNumMarkerOpcodeBits, defaultNumMarkerValueBits, @@ -63,25 +63,7 @@ var ( ) // TimeBucket represents a bucket for encoding time values. -type TimeBucket interface { - - // Opcode is the opcode prefix used to encode all time values in this range. - Opcode() uint64 - - // NumOpcodeBits is the number of bits used to write the opcode. - NumOpcodeBits() int - - // Min is the minimum time value accepted in this range. - Min() int64 - - // Max is the maximum time value accepted in this range. - Max() int64 - - // NumValueBits is the number of bits used to write the time value. - NumValueBits() int -} - -type timeBucket struct { +type TimeBucket struct { min int64 max int64 opcode uint64 @@ -89,9 +71,9 @@ type timeBucket struct { numValueBits int } -// newTimeBucket creates a new time bucket. -func newTimeBucket(opcode uint64, numOpcodeBits, numValueBits int) TimeBucket { - return &timeBucket{ +// NewTimeBucket creates a new time bucket. +func NewTimeBucket(opcode uint64, numOpcodeBits, numValueBits int) TimeBucket { + return TimeBucket{ opcode: opcode, numOpcodeBits: numOpcodeBits, numValueBits: numValueBits, @@ -100,34 +82,31 @@ func newTimeBucket(opcode uint64, numOpcodeBits, numValueBits int) TimeBucket { } } -func (tb *timeBucket) Opcode() uint64 { return tb.opcode } -func (tb *timeBucket) NumOpcodeBits() int { return tb.numOpcodeBits } -func (tb *timeBucket) Min() int64 { return tb.min } -func (tb *timeBucket) Max() int64 { return tb.max } -func (tb *timeBucket) NumValueBits() int { return tb.numValueBits } +// Opcode is the opcode prefix used to encode all time values in this range. +func (tb *TimeBucket) Opcode() uint64 { return tb.opcode } -// TimeEncodingScheme captures information related to time encoding. -type TimeEncodingScheme interface { +// NumOpcodeBits is the number of bits used to write the opcode. +func (tb *TimeBucket) NumOpcodeBits() int { return tb.numOpcodeBits } - // ZeroBucket is time bucket for encoding zero time values. - ZeroBucket() TimeBucket +// Min is the minimum time value accepted in this range. +func (tb *TimeBucket) Min() int64 { return tb.min } - // Buckets are the ordered time buckets used to encode non-zero, non-default time values. - Buckets() []TimeBucket +// Max is the maximum time value accepted in this range. +func (tb *TimeBucket) Max() int64 { return tb.max } - // DefaultBucket is the time bucket for catching all other time values not included in the regular buckets. - DefaultBucket() TimeBucket -} +// NumValueBits is the number of bits used to write the time value. +func (tb *TimeBucket) NumValueBits() int { return tb.numValueBits } -type timeEncodingScheme struct { +// TimeEncodingScheme captures information related to time encoding. +type TimeEncodingScheme struct { zeroBucket TimeBucket buckets []TimeBucket defaultBucket TimeBucket } -// newTimeEncodingSchemes converts the unit-to-scheme mapping +// NewTimeEncodingSchemes converts the unit-to-scheme mapping // to the underlying TimeEncodingSchemes used for lookups. -func newTimeEncodingSchemes(schemes map[xtime.Unit]TimeEncodingScheme) TimeEncodingSchemes { +func NewTimeEncodingSchemes(schemes map[xtime.Unit]TimeEncodingScheme) TimeEncodingSchemes { encodingSchemes := make(TimeEncodingSchemes, xtime.UnitCount()) for k, v := range schemes { if !k.IsValid() { @@ -140,10 +119,10 @@ func newTimeEncodingSchemes(schemes map[xtime.Unit]TimeEncodingScheme) TimeEncod return encodingSchemes } -// newTimeEncodingScheme creates a new time encoding scheme. +// NewTimeEncodingScheme creates a new time encoding scheme. // NB(xichen): numValueBitsForBuckets should be ordered by value // in ascending order (smallest value first). -func newTimeEncodingScheme(numValueBitsForBuckets []int, numValueBitsForDefault int) TimeEncodingScheme { +func NewTimeEncodingScheme(numValueBitsForBuckets []int, numValueBitsForDefault int) TimeEncodingScheme { numBuckets := len(numValueBitsForBuckets) buckets := make([]TimeBucket, 0, numBuckets) numOpcodeBits := 1 @@ -151,71 +130,45 @@ func newTimeEncodingScheme(numValueBitsForBuckets []int, numValueBitsForDefault i := 0 for i < numBuckets { opcode = uint64(1<= len(s) { return nil, false } - - scheme := s[u] - if scheme == nil { - return nil, false - } - - return s[u], true + return &s[u], true } // Marker represents the markers. type Marker byte // MarkerEncodingScheme captures the information related to marker encoding. -type MarkerEncodingScheme interface { - - // Opcode returns the marker opcode. - Opcode() uint64 - - // NumOpcodeBits returns the number of bits used for the opcode. - NumOpcodeBits() int - - // NumValueBits returns the number of bits used for the marker value. - NumValueBits() int - - // EndOfStream returns the end of stream marker. - EndOfStream() Marker - - // Annotation returns the annotation marker. - Annotation() Marker - - // TimeUnit returns the time unit marker. - TimeUnit() Marker - - // Tail will return the tail portion of a stream including the relevant bits - // in the last byte along with the end of stream marker. - Tail(streamLastByte byte, streamCurrentPosition int) checked.Bytes -} - -type markerEncodingScheme struct { +type MarkerEncodingScheme struct { opcode uint64 numOpcodeBits int numValueBits int @@ -225,15 +178,16 @@ type markerEncodingScheme struct { tails [256][8]checked.Bytes } -func newMarkerEncodingScheme( +// NewMarkerEncodingScheme returns new marker encoding. +func NewMarkerEncodingScheme( opcode uint64, numOpcodeBits int, numValueBits int, endOfStream Marker, annotation Marker, timeUnit Marker, -) MarkerEncodingScheme { - scheme := &markerEncodingScheme{ +) *MarkerEncodingScheme { + scheme := &MarkerEncodingScheme{ opcode: opcode, numOpcodeBits: numOpcodeBits, numValueBits: numValueBits, @@ -260,15 +214,29 @@ func newMarkerEncodingScheme( // WriteSpecialMarker writes the marker that marks the start of a special symbol, // e.g., the eos marker, the annotation marker, or the time unit marker. -func WriteSpecialMarker(os OStream, scheme MarkerEncodingScheme, marker Marker) { +func WriteSpecialMarker(os OStream, scheme *MarkerEncodingScheme, marker Marker) { os.WriteBits(scheme.Opcode(), scheme.NumOpcodeBits()) os.WriteBits(uint64(marker), scheme.NumValueBits()) } -func (mes *markerEncodingScheme) Opcode() uint64 { return mes.opcode } -func (mes *markerEncodingScheme) NumOpcodeBits() int { return mes.numOpcodeBits } -func (mes *markerEncodingScheme) NumValueBits() int { return mes.numValueBits } -func (mes *markerEncodingScheme) EndOfStream() Marker { return mes.endOfStream } -func (mes *markerEncodingScheme) Annotation() Marker { return mes.annotation } -func (mes *markerEncodingScheme) TimeUnit() Marker { return mes.timeUnit } -func (mes *markerEncodingScheme) Tail(b byte, pos int) checked.Bytes { return mes.tails[int(b)][pos-1] } +// Opcode returns the marker opcode. +func (mes *MarkerEncodingScheme) Opcode() uint64 { return mes.opcode } + +// NumOpcodeBits returns the number of bits used for the opcode. +func (mes *MarkerEncodingScheme) NumOpcodeBits() int { return mes.numOpcodeBits } + +// NumValueBits returns the number of bits used for the marker value. +func (mes *MarkerEncodingScheme) NumValueBits() int { return mes.numValueBits } + +// EndOfStream returns the end of stream marker. +func (mes *MarkerEncodingScheme) EndOfStream() Marker { return mes.endOfStream } + +// Annotation returns the annotation marker. +func (mes *MarkerEncodingScheme) Annotation() Marker { return mes.annotation } + +// TimeUnit returns the time unit marker. +func (mes *MarkerEncodingScheme) TimeUnit() Marker { return mes.timeUnit } + +// Tail will return the tail portion of a stream including the relevant bits +// in the last byte along with the end of stream marker. +func (mes *MarkerEncodingScheme) Tail(b byte, pos int) checked.Bytes { return mes.tails[int(b)][pos-1] } diff --git a/src/dbnode/encoding/types.go b/src/dbnode/encoding/types.go index 567d908016..15978539d9 100644 --- a/src/dbnode/encoding/types.go +++ b/src/dbnode/encoding/types.go @@ -105,10 +105,10 @@ type Options interface { TimeEncodingSchemes() TimeEncodingSchemes // SetMarkerEncodingScheme sets the marker encoding scheme. - SetMarkerEncodingScheme(value MarkerEncodingScheme) Options + SetMarkerEncodingScheme(value *MarkerEncodingScheme) Options // MarkerEncodingScheme returns the marker encoding scheme. - MarkerEncodingScheme() MarkerEncodingScheme + MarkerEncodingScheme() *MarkerEncodingScheme // SetEncoderPool sets the encoder pool. SetEncoderPool(value EncoderPool) Options