Skip to content

Commit

Permalink
[dbnode] Decode perf time ops (#2176)
Browse files Browse the repository at this point in the history
Decode per time optimizations includes avoiding map access for time encoding schemes and using nanos int64 time operations instead of time struct
  • Loading branch information
rallen090 authored Mar 4, 2020
1 parent dd53ddb commit 987db51
Show file tree
Hide file tree
Showing 36 changed files with 252 additions and 202 deletions.
4 changes: 2 additions & 2 deletions src/cmd/services/m3comparator/main/series_iterator_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func buildSeriesIterator(
ID: id,
Namespace: ident.StringID("ns"),
Tags: tagIter,
StartInclusive: opts.start,
EndExclusive: end,
StartInclusive: xtime.ToUnixNano(opts.start),
EndExclusive: xtime.ToUnixNano(end),
Replicas: []encoding.MultiReaderIterator{
multiReader,
},
Expand Down
5 changes: 3 additions & 2 deletions src/dbnode/client/fetch_tagged_results_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/m3db/m3/src/dbnode/topology"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

type fetchTaggedResultAccumulatorOpts struct {
Expand Down Expand Up @@ -294,8 +295,8 @@ func (accum *fetchTaggedResultAccumulator) sliceResponsesAsSeriesIter(
ID: pools.ID().BinaryID(tsID),
Namespace: pools.ID().BinaryID(nsID),
Tags: decoder,
StartInclusive: accum.startTime,
EndExclusive: accum.endTime,
StartInclusive: xtime.ToUnixNano(accum.startTime),
EndExclusive: xtime.ToUnixNano(accum.endTime),
Replicas: iters,
})

Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,8 +1591,8 @@ func (s *session) fetchIDsAttempt(
iter.Reset(encoding.SeriesIteratorOptions{
ID: seriesID,
Namespace: namespaceID,
StartInclusive: startInclusive,
EndExclusive: endExclusive,
StartInclusive: xtime.ToUnixNano(startInclusive),
EndExclusive: xtime.ToUnixNano(endExclusive),
Replicas: itersToInclude,
})
iters.SetAt(idx, iter)
Expand Down
5 changes: 3 additions & 2 deletions src/dbnode/client/session_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,9 @@ func fulfillFetchBatchOps(
}
for _, value := range f.values {
dp := ts.Datapoint{
Timestamp: value.t,
Value: value.value,
Timestamp: value.t,
TimestampNanos: xtime.ToUnixNano(value.t),
Value: value.value,
}
encoder.Encode(dp, value.unit, value.annotation)
}
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/client/session_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func TestProtoSeriesIteratorRoundtrip(t *testing.T) {
seriesIter.Reset(encoding.SeriesIteratorOptions{
ID: ident.StringID("test_series_id"),
Namespace: testNamespace,
StartInclusive: data[0].t,
EndExclusive: start.Add(4 * time.Second),
StartInclusive: xtime.ToUnixNano(data[0].t),
EndExclusive: xtime.ToUnixNano(start.Add(4 * time.Second)),
Replicas: []encoding.MultiReaderIterator{multiIter},
})

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/dbnode/encoding/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (it *testIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
idx = 0
}
v := it.values[idx]
dp := ts.Datapoint{Timestamp: v.t, Value: v.value}
dp := ts.Datapoint{Timestamp: v.t, TimestampNanos: xtime.ToUnixNano(v.t), Value: v.value}
return dp, v.unit, ts.Annotation(v.annotation)
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func (it *testMultiIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation)
idx = 0
}
v := it.values[idx]
dp := ts.Datapoint{Timestamp: v.t, Value: v.value}
dp := ts.Datapoint{Timestamp: v.t, TimestampNanos: xtime.ToUnixNano(v.t), Value: v.value}
return dp, v.unit, ts.Annotation(v.annotation)
}

Expand Down
41 changes: 19 additions & 22 deletions src/dbnode/encoding/iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,26 @@
package encoding

import (
"math"
"sort"
"time"

"github.com/m3db/m3/src/dbnode/ts"
xtime "github.com/m3db/m3/src/x/time"
)

var (
// time is stored as an int64 plus an int32 nanosecond value, but if you
// use max int64 for the seconds component only then integer overflow
// will occur when performing comparisons like time.Before() and they
// will not work correctly.
timeMax = time.Unix(1<<63-62135596801, 999999999)
// UnixNano is an int64, so the max time is the max of that type.
timeMaxNanos = xtime.UnixNano(math.MaxInt64)
)

// iterators is a collection of iterators, and allows for reading in order values
// from the underlying iterators that are separately in order themselves.
type iterators struct {
values []Iterator
earliest []Iterator
earliestAt time.Time
filterStart time.Time
filterEnd time.Time
earliestAt xtime.UnixNano
filterStart xtime.UnixNano
filterEnd xtime.UnixNano
filtering bool
equalTimesStrategy IterateEqualTimestampStrategy

Expand Down Expand Up @@ -106,7 +103,7 @@ func (i *iterators) current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
return i.earliest[numIters-1].Current()
}

func (i *iterators) at() time.Time {
func (i *iterators) at() xtime.UnixNano {
return i.earliestAt
}

Expand All @@ -121,26 +118,26 @@ func (i *iterators) push(iter Iterator) bool {

func (i *iterators) tryAddEarliest(iter Iterator) {
dp, _, _ := iter.Current()
if dp.Timestamp.Equal(i.earliestAt) {
if dp.TimestampNanos == i.earliestAt {
// Push equal earliest
i.earliest = append(i.earliest, iter)
} else if dp.Timestamp.Before(i.earliestAt) {
} else if dp.TimestampNanos < i.earliestAt {
// Reset earliest and push new iter
i.earliest = append(i.earliest[:0], iter)
i.earliestAt = dp.Timestamp
i.earliestAt = dp.TimestampNanos
}
}

func (i *iterators) moveIteratorToFilterNext(iter Iterator) bool {
next := true
for next {
dp, _, _ := iter.Current()
if dp.Timestamp.Before(i.filterStart) {
if dp.TimestampNanos < i.filterStart {
// Filter out any before start
next = iter.Next()
continue
}
if !dp.Timestamp.Before(i.filterEnd) {
if dp.TimestampNanos >= i.filterEnd {
// Filter out completely if after end
next = false
break
Expand Down Expand Up @@ -201,15 +198,15 @@ func (i *iterators) moveToValidNext() (bool, error) {
}

// Force first to be new earliest, evaluate rest
i.earliestAt = timeMax
i.earliestAt = timeMaxNanos
for _, iter := range i.values {
i.tryAddEarliest(iter)
}

// Apply filter to new earliest if necessary
if i.filtering {
inFilter := i.earliestAt.Before(i.filterEnd) &&
!i.earliestAt.Before(i.filterStart)
inFilter := i.earliestAt < i.filterEnd &&
i.earliestAt >= i.filterStart
if !inFilter {
return i.moveToValidNext()
}
Expand All @@ -218,8 +215,8 @@ func (i *iterators) moveToValidNext() (bool, error) {
return i.validateNext(true, prevAt)
}

func (i *iterators) validateNext(next bool, prevAt time.Time) (bool, error) {
if i.earliestAt.Before(prevAt) {
func (i *iterators) validateNext(next bool, prevAt xtime.UnixNano) (bool, error) {
if i.earliestAt < prevAt {
// Out of order datapoint
i.reset()
return false, errOutOfOrderIterator
Expand All @@ -237,10 +234,10 @@ func (i *iterators) reset() {
i.earliest[idx] = nil
}
i.earliest = i.earliest[:0]
i.earliestAt = timeMax
i.earliestAt = timeMaxNanos
}

func (i *iterators) setFilter(start, end time.Time) {
func (i *iterators) setFilter(start, end xtime.UnixNano) {
i.filtering = true
i.filterStart = start
i.filterEnd = end
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (enc *encoder) Reset(start time.Time, capacity int, schema namespace.Schema
func (enc *encoder) reset(start time.Time, bytes checked.Bytes) {
enc.os.Reset(bytes)

timeUnit := initialTimeUnit(start, enc.opts.DefaultTimeUnit())
timeUnit := initialTimeUnit(xtime.ToUnixNano(start), enc.opts.DefaultTimeUnit())
enc.tsEncoderState = NewTimestampEncoder(start, timeUnit, enc.opts)

enc.floatEnc = FloatEncoderAndIterator{}
Expand Down
66 changes: 33 additions & 33 deletions src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ func TestEncodeNoAnnotation(t *testing.T) {

startTime := time.Unix(1427162462, 0)
inputs := []ts.Datapoint{
{startTime, 12},
{startTime.Add(time.Second * 60), 12},
{startTime.Add(time.Second * 120), 24},
{startTime.Add(-time.Second * 76), 24},
{startTime.Add(-time.Second * 16), 24},
{startTime.Add(time.Second * 2092), 15},
{startTime.Add(time.Second * 4200), 12},
{Timestamp: startTime, Value: 12},
{Timestamp: startTime.Add(time.Second * 60), Value: 12},
{Timestamp: startTime.Add(time.Second * 120), Value: 24},
{Timestamp: startTime.Add(-time.Second * 76), Value: 24},
{Timestamp: startTime.Add(-time.Second * 16), Value: 24},
{Timestamp: startTime.Add(time.Second * 2092), Value: 15},
{Timestamp: startTime.Add(time.Second * 4200), Value: 12},
}
for _, input := range inputs {
encoder.Encode(input, xtime.Second, nil)
Expand Down Expand Up @@ -254,13 +254,13 @@ func TestEncodeWithAnnotation(t *testing.T) {
dp ts.Datapoint
ant ts.Annotation
}{
{ts.Datapoint{startTime, 12}, []byte{0xa}},
{ts.Datapoint{startTime.Add(time.Second * 60), 12}, []byte{0xa}},
{ts.Datapoint{startTime.Add(time.Second * 120), 24}, nil},
{ts.Datapoint{startTime.Add(-time.Second * 76), 24}, nil},
{ts.Datapoint{startTime.Add(-time.Second * 16), 24}, []byte{0x1, 0x2}},
{ts.Datapoint{startTime.Add(time.Second * 2092), 15}, nil},
{ts.Datapoint{startTime.Add(time.Second * 4200), 12}, nil},
{ts.Datapoint{Timestamp: startTime, Value: 12}, []byte{0xa}},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 60), Value: 12}, []byte{0xa}},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 120), Value: 24}, nil},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 76), Value: 24}, nil},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 16), Value: 24}, []byte{0x1, 0x2}},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 2092), Value: 15}, nil},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 4200), Value: 12}, nil},
}

for _, input := range inputs {
Expand Down Expand Up @@ -298,15 +298,15 @@ func TestEncodeWithTimeUnit(t *testing.T) {
dp ts.Datapoint
tu xtime.Unit
}{
{ts.Datapoint{startTime, 12}, xtime.Second},
{ts.Datapoint{startTime.Add(time.Second * 60), 12}, xtime.Second},
{ts.Datapoint{startTime.Add(time.Second * 120), 24}, xtime.Second},
{ts.Datapoint{startTime.Add(-time.Second * 76), 24}, xtime.Second},
{ts.Datapoint{startTime.Add(-time.Second * 16), 24}, xtime.Second},
{ts.Datapoint{startTime.Add(-time.Nanosecond * 15500000000), 15}, xtime.Nanosecond},
{ts.Datapoint{startTime.Add(-time.Millisecond * 1400), 12}, xtime.Millisecond},
{ts.Datapoint{startTime.Add(-time.Second * 10), 12}, xtime.Second},
{ts.Datapoint{startTime.Add(time.Second * 10), 12}, xtime.Second},
{ts.Datapoint{Timestamp: startTime, Value: 12}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 60), Value: 12}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 120), Value: 24}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 76), Value: 24}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 16), Value: 24}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(-time.Nanosecond * 15500000000), Value: 15}, xtime.Nanosecond},
{ts.Datapoint{Timestamp: startTime.Add(-time.Millisecond * 1400), Value: 12}, xtime.Millisecond},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 10), Value: 12}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 10), Value: 12}, xtime.Second},
}

for _, input := range inputs {
Expand Down Expand Up @@ -337,13 +337,13 @@ func TestEncodeWithAnnotationAndTimeUnit(t *testing.T) {
ant ts.Annotation
tu xtime.Unit
}{
{ts.Datapoint{startTime, 12}, []byte{0xa}, xtime.Second},
{ts.Datapoint{startTime.Add(time.Second * 60), 12}, nil, xtime.Second},
{ts.Datapoint{startTime.Add(time.Second * 120), 24}, nil, xtime.Second},
{ts.Datapoint{startTime.Add(-time.Second * 76), 24}, []byte{0x1, 0x2}, xtime.Second},
{ts.Datapoint{startTime.Add(-time.Second * 16), 24}, nil, xtime.Millisecond},
{ts.Datapoint{startTime.Add(-time.Millisecond * 15500), 15}, []byte{0x3, 0x4, 0x5}, xtime.Millisecond},
{ts.Datapoint{startTime.Add(-time.Millisecond * 14000), 12}, nil, xtime.Second},
{ts.Datapoint{Timestamp: startTime, Value: 12}, []byte{0xa}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 60), Value: 12}, nil, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(time.Second * 120), Value: 24}, nil, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 76), Value: 24}, []byte{0x1, 0x2}, xtime.Second},
{ts.Datapoint{Timestamp: startTime.Add(-time.Second * 16), Value: 24}, nil, xtime.Millisecond},
{ts.Datapoint{Timestamp: startTime.Add(-time.Millisecond * 15500), Value: 15}, []byte{0x3, 0x4, 0x5}, xtime.Millisecond},
{ts.Datapoint{Timestamp: startTime.Add(-time.Millisecond * 14000), Value: 12}, nil, xtime.Second},
}

for _, input := range inputs {
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestInitTimeUnit(t *testing.T) {
{time.Unix(1, 1000), xtime.Unit(9), xtime.None},
}
for _, input := range inputs {
require.Equal(t, input.expected, initialTimeUnit(input.start, input.tu))
require.Equal(t, input.expected, initialTimeUnit(xtime.ToUnixNano(input.start), input.tu))
}
}

Expand All @@ -389,7 +389,7 @@ func TestEncoderResets(t *testing.T) {
_, ok := enc.Stream(ctx)
require.False(t, ok)

enc.Encode(ts.Datapoint{testStartTime, 12}, xtime.Second, nil)
enc.Encode(ts.Datapoint{Timestamp: testStartTime, Value: 12}, xtime.Second, nil)
require.True(t, enc.os.Len() > 0)

now := time.Now()
Expand All @@ -400,7 +400,7 @@ func TestEncoderResets(t *testing.T) {
b, _ := enc.os.Rawbytes()
require.Equal(t, []byte{}, b)

enc.Encode(ts.Datapoint{now, 13}, xtime.Second, nil)
enc.Encode(ts.Datapoint{Timestamp: now, Value: 13}, xtime.Second, nil)
require.True(t, enc.os.Len() > 0)

enc.DiscardReset(now, 0, nil)
Expand Down
10 changes: 6 additions & 4 deletions src/dbnode/encoding/m3tsz/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,16 @@ func (it *readerIterator) readBits(numBits int) uint64 {
func (it *readerIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
if !it.intOptimized || it.isFloat {
return ts.Datapoint{
Timestamp: it.tsIterator.PrevTime,
Value: math.Float64frombits(it.floatIter.PrevFloatBits),
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
Value: math.Float64frombits(it.floatIter.PrevFloatBits),
}, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
}

return ts.Datapoint{
Timestamp: it.tsIterator.PrevTime,
Value: convertFromIntFloat(it.intVal, it.mult),
Timestamp: it.tsIterator.PrevTime.ToTime(),
TimestampNanos: it.tsIterator.PrevTime,
Value: convertFromIntFloat(it.intVal, it.mult),
}, it.tsIterator.TimeUnit, it.tsIterator.PrevAnt
}

Expand Down
Loading

0 comments on commit 987db51

Please sign in to comment.