Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[dbnode] Improve m3tsz decoding performance (#3358)" #3403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 29 additions & 22 deletions src/dbnode/encoding/istream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@ import (
// IStream encapsulates a readable stream.
type IStream struct {
r xio.Reader64
err error // error encountered
current uint64 // current uint64 we are working off of
index int // current index within data slice
remaining uint8 // bits remaining in current to be read
}

var (
_ io.ByteReader = (*IStream)(nil)
_ io.Reader = (*IStream)(nil)
)

// NewIStream creates a new IStream
func NewIStream(reader64 xio.Reader64) *IStream {
return &IStream{r: reader64}
Expand Down Expand Up @@ -71,30 +67,23 @@ func (is *IStream) ReadBit() (Bit, error) {

// ReadBits reads the next Bits.
func (is *IStream) ReadBits(numBits uint8) (uint64, error) {
res := is.current >> (64 - numBits)
remaining := is.remaining
if numBits <= remaining {
if is.err != nil {
return 0, is.err
}
if numBits <= is.remaining {
// Have enough bits buffered.
is.current <<= numBits
is.remaining -= numBits
return res, nil
return is.consumeBuffer(numBits), nil
}

res := readBitsInWord(is.current, numBits)
// Not enough bits buffered, read next word from the stream.
bitsNeeded := numBits - remaining

current, n, err := is.r.Read64()
if err != nil {
bitsNeeded := numBits - is.remaining
if err := is.readWordFromStream(); err != nil {
return 0, err
}
n *= 8
if n < bitsNeeded {
if is.remaining < bitsNeeded {
return 0, io.EOF
}

is.current = current << bitsNeeded
is.remaining = n - bitsNeeded
return res | current>>(64-bitsNeeded), nil
return res | is.consumeBuffer(bitsNeeded), nil
}

// PeekBits looks at the next Bits, but doesn't move the pos.
Expand Down Expand Up @@ -124,8 +113,26 @@ func readBitsInWord(w uint64, numBits uint8) uint64 {
return w >> (64 - numBits)
}

// consumeBuffer consumes numBits in is.current.
func (is *IStream) consumeBuffer(numBits uint8) uint64 {
res := readBitsInWord(is.current, numBits)
is.current <<= numBits
is.remaining -= numBits
return res
}

func (is *IStream) readWordFromStream() error {
current, bytes, err := is.r.Read64()
is.current = current
is.remaining = 8 * bytes
is.err = err

return err
}

// Reset resets the IStream.
func (is *IStream) Reset(reader xio.Reader64) {
is.err = nil
is.current = 0
is.remaining = 0
is.index = 0
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/encoding/istream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,5 @@ func TestIStreamReset(t *testing.T) {
require.Equal(t, uint64(0), is.current)
require.Equal(t, uint8(0), is.remaining)
require.Equal(t, 0, is.index)
require.NoError(t, is.err)
}
22 changes: 10 additions & 12 deletions src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ var (

// encoder is an M3TSZ encoder that can encode a stream of data in M3TSZ format.
type encoder struct {
os encoding.OStream
opts encoding.Options
markerEncodingScheme *encoding.MarkerEncodingScheme
os encoding.OStream
opts encoding.Options

// internal bookkeeping
tsEncoderState TimestampEncoder
Expand Down Expand Up @@ -76,12 +75,11 @@ func NewEncoder(
// `Reset` method is called.
initAllocIfEmpty := opts.EncoderPool() == nil
return &encoder{
os: encoding.NewOStream(bytes, initAllocIfEmpty, opts.BytesPool()),
opts: opts,
markerEncodingScheme: opts.MarkerEncodingScheme(),
tsEncoderState: NewTimestampEncoder(start, opts.DefaultTimeUnit(), opts),
closed: false,
intOptimized: intOptimized,
os: encoding.NewOStream(bytes, initAllocIfEmpty, opts.BytesPool()),
opts: opts,
tsEncoderState: NewTimestampEncoder(start, opts.DefaultTimeUnit(), opts),
closed: false,
intOptimized: intOptimized,
}
}

Expand Down Expand Up @@ -336,7 +334,7 @@ func (enc *encoder) Len() int {
var (
lastIdx = len(raw) - 1
lastByte = raw[lastIdx]
scheme = enc.markerEncodingScheme
scheme = enc.opts.MarkerEncodingScheme()
tail = scheme.Tail(lastByte, pos)
)
tail.IncRef()
Expand Down Expand Up @@ -408,7 +406,7 @@ func (enc *encoder) segmentZeroCopy(ctx context.Context) ts.Segment {
ctx.RegisterCloser(buffer.DelayFinalizer())

// Take a shared ref to a known good tail.
scheme := enc.markerEncodingScheme
scheme := enc.opts.MarkerEncodingScheme()
tail := scheme.Tail(lastByte, pos)

// NB(r): Finalize the head bytes whether this is by ref or copy. If by
Expand Down Expand Up @@ -436,7 +434,7 @@ func (enc *encoder) segmentTakeOwnership() ts.Segment {
head.DecRef()

// Take a shared ref to a known good tail.
scheme := enc.markerEncodingScheme
scheme := enc.opts.MarkerEncodingScheme()
tail := scheme.Tail(lastByte, pos)

// NB(r): Finalize the head bytes whether this is by ref or copy. If by
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestWriteDeltaOfDeltaTimeUnitChanged(t *testing.T) {
}
for _, input := range inputs {
stream := encoding.NewOStream(nil, false, nil)
tsEncoder := NewTimestampEncoder(testStartTime, xtime.Nanosecond, encoding.NewOptions())
tsEncoder := NewTimestampEncoder(testStartTime, xtime.Nanosecond, nil)
tsEncoder.writeDeltaOfDeltaTimeUnitChanged(stream, 0, input.delta)
b, p := stream.RawBytes()
require.Equal(t, input.expectedBytes, b)
Expand Down
69 changes: 31 additions & 38 deletions src/dbnode/encoding/m3tsz/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package m3tsz

import (
"errors"
"math"

"github.com/m3db/m3/src/dbnode/encoding"
Expand All @@ -31,8 +30,6 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

var errClosed = errors.New("iterator is closed")

// DefaultReaderIteratorAllocFn returns a function for allocating NewReaderIterator.
func DefaultReaderIteratorAllocFn(
opts encoding.Options,
Expand Down Expand Up @@ -88,15 +85,19 @@ func (it *readerIterator) Next() bool {
return false
}

if !first {
it.readNextValue()
} else {
it.readFirstValue()
}
it.readValue(first)

return it.hasNext()
}

func (it *readerIterator) readValue(first bool) {
if first {
it.readFirstValue()
} else {
it.readNextValue()
}
}

func (it *readerIterator) readFirstValue() {
if !it.intOptimized {
if err := it.floatIter.readFullFloat(it.is); err != nil {
Expand Down Expand Up @@ -149,18 +150,9 @@ func (it *readerIterator) readNextValue() {
if err := it.floatIter.readNextFloat(it.is); err != nil {
it.err = err
}
return
}

// inlined readIntValDiff()
bits := it.readBits(it.sig + 1)
sign := -1.0
if (bits >> it.sig) == opcodeNegative {
sign = 1.0
// clear the opcode bit
bits ^= uint64(1 << it.sig)
} else {
it.readIntValDiff()
}
it.intVal += sign * float64(bits)
}

func (it *readerIterator) readIntSigMult() {
Expand All @@ -181,38 +173,40 @@ func (it *readerIterator) readIntSigMult() {
}

func (it *readerIterator) readIntValDiff() {
// read both sign bit and digits in one read
bits := it.readBits(it.sig + 1)
sign := -1.0
if (bits >> it.sig) == opcodeNegative {
if it.readBits(1) == opcodeNegative {
sign = 1.0
// clear the opcode bit
bits ^= uint64(1 << it.sig)
}
it.intVal += sign * float64(bits)

it.intVal += sign * float64(it.readBits(it.sig))
}

func (it *readerIterator) readBits(numBits uint8) (res uint64) {
func (it *readerIterator) readBits(numBits uint8) uint64 {
if !it.hasNext() {
return 0
}
var res uint64
res, it.err = it.is.ReadBits(numBits)
return
return res
}

// Current returns the value as well as the annotation associated with the current datapoint.
// Users should not hold on to the returned Annotation object as it may get invalidated when
// the iterator calls Next().
func (it *readerIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
dp := ts.Datapoint{
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
}

if !it.intOptimized || it.isFloat {
dp.Value = math.Float64frombits(it.floatIter.PrevFloatBits)
} else {
dp.Value = convertFromIntFloat(it.intVal, it.mult)
return ts.Datapoint{
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
Value: math.Float64frombits(it.floatIter.PrevFloatBits),
}, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
}

return dp, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
return ts.Datapoint{
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
Value: convertFromIntFloat(it.intVal, it.mult),
}, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
}

// Err returns the error encountered
Expand All @@ -233,7 +227,7 @@ func (it *readerIterator) isClosed() bool {
}

func (it *readerIterator) hasNext() bool {
return !it.hasError() && !it.isDone()
return !it.hasError() && !it.isDone() && !it.isClosed()
}

// Reset resets the ReadIterator for reuse.
Expand All @@ -255,7 +249,6 @@ func (it *readerIterator) Close() {
}

it.closed = true
it.err = errClosed
pool := it.opts.ReaderIteratorPool()
if pool != nil {
pool.Put(it)
Expand Down
7 changes: 1 addition & 6 deletions src/dbnode/encoding/m3tsz/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) {

it.TimeUnit = input.timeUnit
it.PrevTimeDelta = input.previousTimeDelta
tes, _ := it.timeEncodingSchemes.SchemeForUnit(it.TimeUnit)
it.timeEncodingScheme = tes

err := it.readNextTimestamp(stream)
require.NoError(t, err)
Expand All @@ -76,10 +74,7 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) {

stream := encoding.NewIStream(xio.NewBytesReader64([]byte{0x1}))
it := NewTimestampIterator(encoding.NewOptions(), false)
err := it.readFirstTimestamp(stream)
require.Error(t, err)

err = it.readNextTimestamp(stream)
err := it.readNextTimestamp(stream)
require.Error(t, err)

err = it.readNextTimestamp(stream)
Expand Down
Loading