Skip to content

Commit

Permalink
Revert "Revert "[dbnode] Improve m3tsz decoding performance (#3358)" (#…
Browse files Browse the repository at this point in the history
…3403)"

This reverts commit 8cf1168.
  • Loading branch information
vdarulis committed Apr 6, 2021
1 parent 960ae5e commit fe9a368
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 248 deletions.
8 changes: 4 additions & 4 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 22 additions & 29 deletions src/dbnode/encoding/istream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ import (
// IStream encapsulates a readable stream.
type IStream struct {
r xio.Reader64
err error // error encountered
current uint64 // current uint64 we are working off of
index int // current index within data slice
remaining uint8 // bits remaining in current to be read
}

var (
_ io.ByteReader = (*IStream)(nil)
_ io.Reader = (*IStream)(nil)
)

// NewIStream creates a new IStream
func NewIStream(reader64 xio.Reader64) *IStream {
return &IStream{r: reader64}
Expand Down Expand Up @@ -67,23 +71,30 @@ func (is *IStream) ReadBit() (Bit, error) {

// ReadBits reads the next Bits.
func (is *IStream) ReadBits(numBits uint8) (uint64, error) {
if is.err != nil {
return 0, is.err
}
if numBits <= is.remaining {
res := is.current >> (64 - numBits)
remaining := is.remaining
if numBits <= remaining {
// Have enough bits buffered.
return is.consumeBuffer(numBits), nil
is.current <<= numBits
is.remaining -= numBits
return res, nil
}
res := readBitsInWord(is.current, numBits)

// Not enough bits buffered, read next word from the stream.
bitsNeeded := numBits - is.remaining
if err := is.readWordFromStream(); err != nil {
bitsNeeded := numBits - remaining

current, n, err := is.r.Read64()
if err != nil {
return 0, err
}
if is.remaining < bitsNeeded {
n *= 8
if n < bitsNeeded {
return 0, io.EOF
}
return res | is.consumeBuffer(bitsNeeded), nil

is.current = current << bitsNeeded
is.remaining = n - bitsNeeded
return res | current>>(64-bitsNeeded), nil
}

// PeekBits looks at the next Bits, but doesn't move the pos.
Expand Down Expand Up @@ -113,26 +124,8 @@ func readBitsInWord(w uint64, numBits uint8) uint64 {
return w >> (64 - numBits)
}

// consumeBuffer consumes numBits in is.current.
func (is *IStream) consumeBuffer(numBits uint8) uint64 {
res := readBitsInWord(is.current, numBits)
is.current <<= numBits
is.remaining -= numBits
return res
}

func (is *IStream) readWordFromStream() error {
current, bytes, err := is.r.Read64()
is.current = current
is.remaining = 8 * bytes
is.err = err

return err
}

// Reset resets the IStream.
func (is *IStream) Reset(reader xio.Reader64) {
is.err = nil
is.current = 0
is.remaining = 0
is.index = 0
Expand Down
1 change: 0 additions & 1 deletion src/dbnode/encoding/istream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,4 @@ func TestIStreamReset(t *testing.T) {
require.Equal(t, uint64(0), is.current)
require.Equal(t, uint8(0), is.remaining)
require.Equal(t, 0, is.index)
require.NoError(t, is.err)
}
22 changes: 12 additions & 10 deletions src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ var (

// encoder is an M3TSZ encoder that can encode a stream of data in M3TSZ format.
type encoder struct {
os encoding.OStream
opts encoding.Options
os encoding.OStream
opts encoding.Options
markerEncodingScheme *encoding.MarkerEncodingScheme

// internal bookkeeping
tsEncoderState TimestampEncoder
Expand Down Expand Up @@ -75,11 +76,12 @@ func NewEncoder(
// `Reset` method is called.
initAllocIfEmpty := opts.EncoderPool() == nil
return &encoder{
os: encoding.NewOStream(bytes, initAllocIfEmpty, opts.BytesPool()),
opts: opts,
tsEncoderState: NewTimestampEncoder(start, opts.DefaultTimeUnit(), opts),
closed: false,
intOptimized: intOptimized,
os: encoding.NewOStream(bytes, initAllocIfEmpty, opts.BytesPool()),
opts: opts,
markerEncodingScheme: opts.MarkerEncodingScheme(),
tsEncoderState: NewTimestampEncoder(start, opts.DefaultTimeUnit(), opts),
closed: false,
intOptimized: intOptimized,
}
}

Expand Down Expand Up @@ -334,7 +336,7 @@ func (enc *encoder) Len() int {
var (
lastIdx = len(raw) - 1
lastByte = raw[lastIdx]
scheme = enc.opts.MarkerEncodingScheme()
scheme = enc.markerEncodingScheme
tail = scheme.Tail(lastByte, pos)
)
tail.IncRef()
Expand Down Expand Up @@ -406,7 +408,7 @@ func (enc *encoder) segmentZeroCopy(ctx context.Context) ts.Segment {
ctx.RegisterCloser(buffer.DelayFinalizer())

// Take a shared ref to a known good tail.
scheme := enc.opts.MarkerEncodingScheme()
scheme := enc.markerEncodingScheme
tail := scheme.Tail(lastByte, pos)

// NB(r): Finalize the head bytes whether this is by ref or copy. If by
Expand Down Expand Up @@ -434,7 +436,7 @@ func (enc *encoder) segmentTakeOwnership() ts.Segment {
head.DecRef()

// Take a shared ref to a known good tail.
scheme := enc.opts.MarkerEncodingScheme()
scheme := enc.markerEncodingScheme
tail := scheme.Tail(lastByte, pos)

// NB(r): Finalize the head bytes whether this is by ref or copy. If by
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestWriteDeltaOfDeltaTimeUnitChanged(t *testing.T) {
}
for _, input := range inputs {
stream := encoding.NewOStream(nil, false, nil)
tsEncoder := NewTimestampEncoder(testStartTime, xtime.Nanosecond, nil)
tsEncoder := NewTimestampEncoder(testStartTime, xtime.Nanosecond, encoding.NewOptions())
tsEncoder.writeDeltaOfDeltaTimeUnitChanged(stream, 0, input.delta)
b, p := stream.RawBytes()
require.Equal(t, input.expectedBytes, b)
Expand Down
69 changes: 38 additions & 31 deletions src/dbnode/encoding/m3tsz/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package m3tsz

import (
"errors"
"math"

"github.com/m3db/m3/src/dbnode/encoding"
Expand All @@ -30,6 +31,8 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

var errClosed = errors.New("iterator is closed")

// DefaultReaderIteratorAllocFn returns a function for allocating NewReaderIterator.
func DefaultReaderIteratorAllocFn(
opts encoding.Options,
Expand Down Expand Up @@ -85,17 +88,13 @@ func (it *readerIterator) Next() bool {
return false
}

it.readValue(first)

return it.hasNext()
}

func (it *readerIterator) readValue(first bool) {
if first {
it.readFirstValue()
} else {
if !first {
it.readNextValue()
} else {
it.readFirstValue()
}

return it.hasNext()
}

func (it *readerIterator) readFirstValue() {
Expand Down Expand Up @@ -150,9 +149,18 @@ func (it *readerIterator) readNextValue() {
if err := it.floatIter.readNextFloat(it.is); err != nil {
it.err = err
}
} else {
it.readIntValDiff()
return
}

// inlined readIntValDiff()
bits := it.readBits(it.sig + 1)
sign := -1.0
if (bits >> it.sig) == opcodeNegative {
sign = 1.0
// clear the opcode bit
bits ^= uint64(1 << it.sig)
}
it.intVal += sign * float64(bits)
}

func (it *readerIterator) readIntSigMult() {
Expand All @@ -173,40 +181,38 @@ func (it *readerIterator) readIntSigMult() {
}

func (it *readerIterator) readIntValDiff() {
// read both sign bit and digits in one read
bits := it.readBits(it.sig + 1)
sign := -1.0
if it.readBits(1) == opcodeNegative {
if (bits >> it.sig) == opcodeNegative {
sign = 1.0
// clear the opcode bit
bits ^= uint64(1 << it.sig)
}

it.intVal += sign * float64(it.readBits(it.sig))
it.intVal += sign * float64(bits)
}

func (it *readerIterator) readBits(numBits uint8) uint64 {
if !it.hasNext() {
return 0
}
var res uint64
func (it *readerIterator) readBits(numBits uint8) (res uint64) {
res, it.err = it.is.ReadBits(numBits)
return res
return
}

// Current returns the value as well as the annotation associated with the current datapoint.
// Users should not hold on to the returned Annotation object as it may get invalidated when
// the iterator calls Next().
func (it *readerIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
dp := ts.Datapoint{
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
}

if !it.intOptimized || it.isFloat {
return ts.Datapoint{
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
Value: math.Float64frombits(it.floatIter.PrevFloatBits),
}, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
dp.Value = math.Float64frombits(it.floatIter.PrevFloatBits)
} else {
dp.Value = convertFromIntFloat(it.intVal, it.mult)
}

return ts.Datapoint{
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
Value: convertFromIntFloat(it.intVal, it.mult),
}, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
return dp, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
}

// Err returns the error encountered
Expand All @@ -227,7 +233,7 @@ func (it *readerIterator) isClosed() bool {
}

func (it *readerIterator) hasNext() bool {
return !it.hasError() && !it.isDone() && !it.isClosed()
return !it.hasError() && !it.isDone()
}

// Reset resets the ReadIterator for reuse.
Expand All @@ -249,6 +255,7 @@ func (it *readerIterator) Close() {
}

it.closed = true
it.err = errClosed
pool := it.opts.ReaderIteratorPool()
if pool != nil {
pool.Put(it)
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/encoding/m3tsz/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) {

it.TimeUnit = input.timeUnit
it.PrevTimeDelta = input.previousTimeDelta
tes, _ := it.timeEncodingSchemes.SchemeForUnit(it.TimeUnit)
it.timeEncodingScheme = tes

err := it.readNextTimestamp(stream)
require.NoError(t, err)
Expand All @@ -75,7 +77,10 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) {

stream := encoding.NewIStream(xio.NewBytesReader64([]byte{0x1}))
it := NewTimestampIterator(encoding.NewOptions(), false)
err := it.readNextTimestamp(stream)
err := it.readFirstTimestamp(stream)
require.Error(t, err)

err = it.readNextTimestamp(stream)
require.Error(t, err)

err = it.readNextTimestamp(stream)
Expand Down
Loading

0 comments on commit fe9a368

Please sign in to comment.