Skip to content

Commit

Permalink
[query] Increase perf for temporal functions (#2049)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Dec 10, 2019
1 parent 7e54ccf commit 8cbe5a8
Show file tree
Hide file tree
Showing 76 changed files with 2,621 additions and 2,468 deletions.
4 changes: 4 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ analyze:
path: src/cmd/services/m3query/main
options:
allow-unresolved: true
- name: github.com/m3db/m3/src/cmd/services/m3comparator/main
type: go
target: github.com/m3db/m3/src/cmd/services/m3comparator/main
path: src/cmd/services/m3comparator/main
- name: github.com/m3db/m3/src/cmd/tools/carbon_load/main
type: go
target: github.com/m3db/m3/src/cmd/tools/carbon_load/main
Expand Down
4 changes: 2 additions & 2 deletions scripts/comparator/docker-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ function setup_docker {
echo "Run m3query, m3comparator, and prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3comparator
docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes prometheus
docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3query

docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3query
CI=$1
if [[ "$CI" != "true" ]]
then
Expand Down
1 change: 1 addition & 0 deletions scripts/comparator/m3query.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ metrics:

tagOptions:
idScheme: quoted

2 changes: 1 addition & 1 deletion src/aggregator/aggregation/quantile/cm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type SamplePool interface {
// Init initializes the pool.
Init()

// Get returns a sample from the pool.
// Get gets a sample from the pool.
Get() *Sample

// Put returns a sample to the pool.
Expand Down
17 changes: 17 additions & 0 deletions src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -154,6 +155,22 @@ func (q *querier) FetchCompressed(
}
}

break
} else if "gen" == string(matcher.Name) {
cStr := string(matcher.Value)
count, err := strconv.Atoi(cStr)
if err != nil {
return m3.SeriesFetchResult{}, noop, err
}

actualGens = make([]seriesGen, count)
for i := 0; i < count; i++ {
actualGens[i] = seriesGen{
res: time.Second * 15,
name: fmt.Sprintf("foo_%d", i),
}
}

break
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func parseInstantaneousParams(
if fetchOpts.Step == 0 {
fetchOpts.Step = time.Second
}

r.Form.Set(startParam, nowTimeValue)
r.Form.Set(endParam, nowTimeValue)

params, err := parseParams(r, engineOpts, timeoutOpts,
fetchOpts, instrumentOpts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestParseBlockType(t *testing.T) {
instrument.NewOptions()))

r = httptest.NewRequest(http.MethodGet, "/foo?block-type=1", nil)
assert.Equal(t, models.TypeMultiBlock, parseBlockType(r,
assert.Equal(t, models.TypeSingleBlock, parseBlockType(r,
instrument.NewOptions()))

r = httptest.NewRequest(http.MethodGet, "/foo?block-type=2", nil)
Expand Down
20 changes: 10 additions & 10 deletions src/query/api/v1/handler/prometheus/native/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import (
"github.com/stretchr/testify/require"
)

func TestPromReadHandler_Read(t *testing.T) {
testPromReadHandler_Read(t, block.NewResultMetadata(), "")
testPromReadHandler_Read(t, buildWarningMeta("foo", "bar"), "foo_bar")
testPromReadHandler_Read(t, block.ResultMetadata{Exhaustive: false},
func TestPromReadHandlerRead(t *testing.T) {
testPromReadHandlerRead(t, block.NewResultMetadata(), "")
testPromReadHandlerRead(t, buildWarningMeta("foo", "bar"), "foo_bar")
testPromReadHandlerRead(t, block.ResultMetadata{Exhaustive: false},
handler.LimitHeaderSeriesLimitApplied)
}

func testPromReadHandler_Read(
func testPromReadHandlerRead(
t *testing.T,
resultMeta block.ResultMetadata,
ex string,
Expand Down Expand Up @@ -100,14 +100,14 @@ type M3QLResp []struct {
StepSizeMs int `json:"step_size_ms"`
}

func TestPromReadHandlerRead(t *testing.T) {
testPromReadHandlerRead(t, block.NewResultMetadata(), "")
testPromReadHandlerRead(t, buildWarningMeta("foo", "bar"), "foo_bar")
testPromReadHandlerRead(t, block.ResultMetadata{Exhaustive: false},
func TestM3PromReadHandlerRead(t *testing.T) {
testM3PromReadHandlerRead(t, block.NewResultMetadata(), "")
testM3PromReadHandlerRead(t, buildWarningMeta("foo", "bar"), "foo_bar")
testM3PromReadHandlerRead(t, block.ResultMetadata{Exhaustive: false},
handler.LimitHeaderSeriesLimitApplied)
}

func testPromReadHandlerRead(
func testM3PromReadHandlerRead(
t *testing.T,
resultMeta block.ResultMetadata,
ex string,
Expand Down
77 changes: 62 additions & 15 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package remote

import (
"bytes"
"context"
"net/http"
"sync"
Expand Down Expand Up @@ -178,8 +179,8 @@ func (h *PromReadHandler) parseRequest(
}

type readResult struct {
result []*prompb.QueryResult
meta block.ResultMetadata
result []*prompb.QueryResult
}

func (h *PromReadHandler) read(
Expand All @@ -190,18 +191,18 @@ func (h *PromReadHandler) read(
fetchOpts *storage.FetchOptions,
) (readResult, error) {
var (
queryCount = len(r.Queries)
promResults = make([]*prompb.QueryResult, queryCount)
cancelFuncs = make([]context.CancelFunc, queryCount)
queryOpts = &executor.QueryOptions{
queryCount = len(r.Queries)
cancelFuncs = make([]context.CancelFunc, queryCount)
queryResults = make([]*prompb.QueryResult, queryCount)
meta = block.NewResultMetadata()
queryOpts = &executor.QueryOptions{
QueryContextOptions: models.QueryContextOptions{
LimitMaxTimeseries: fetchOpts.Limit,
}}

wg sync.WaitGroup
mu sync.Mutex
multiErr xerrors.MultiError
meta = block.NewResultMetadata()
)

wg.Add(queryCount)
Expand All @@ -221,23 +222,20 @@ func (h *PromReadHandler) read(

// Detect clients closing connections
handler.CloseWatcher(ctx, cancel, w, h.instrumentOpts)
result, err := h.engine.Execute(ctx, query, queryOpts, fetchOpts)
result, err := h.engine.ExecuteProm(ctx, query, queryOpts, fetchOpts)
if err != nil {
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
return
}

result.PromResult.Timeseries = filterResults(
result.PromResult.GetTimeseries(), fetchOpts)
mu.Lock()
queryResults[i] = result.PromResult
meta = meta.CombineMetadata(result.Metadata)
mu.Unlock()
result.SeriesList = prometheus.FilterSeriesByOptions(
result.SeriesList,
fetchOpts,
)
promRes := storage.FetchResultToPromResult(result, h.keepEmpty)
promResults[i] = promRes
}()
}

Expand All @@ -247,8 +245,57 @@ func (h *PromReadHandler) read(
}

if err := multiErr.FinalError(); err != nil {
return readResult{nil, meta}, err
return readResult{result: nil, meta: meta}, err
}

return readResult{result: queryResults, meta: meta}, nil
}

// filterResults removes series tags based on options.
func filterResults(
series []*prompb.TimeSeries,
opts *storage.FetchOptions,
) []*prompb.TimeSeries {
if opts == nil {
return series
}

keys := opts.RestrictQueryOptions.GetRestrictByTag().GetFilterByNames()
if len(keys) == 0 {
return series
}

for i, s := range series {
series[i].Labels = filterLabels(s.Labels, keys)
}

return series
}

func filterLabels(
labels []prompb.Label,
filtering [][]byte,
) []prompb.Label {
if len(filtering) == 0 {
return labels
}

filtered := labels[:0]
for _, l := range labels {
skip := false
for _, f := range filtering {
if bytes.Equal(l.GetName(), f) {
skip = true
break
}
}

if skip {
continue
}

filtered = append(filtered, l)
}

return readResult{promResults, meta}, nil
return filtered
}
102 changes: 78 additions & 24 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/test"
"github.com/m3db/m3/src/query/test/m3"
"github.com/m3db/m3/src/query/ts"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"

Expand Down Expand Up @@ -244,24 +243,14 @@ func TestMultipleRead(t *testing.T) {
now := time.Now()
promNow := storage.TimeToPromTimestamp(now)

vals := ts.NewMockValues(ctrl)
vals.EXPECT().Len().Return(1).AnyTimes()
dp := ts.Datapoints{{Timestamp: now, Value: 1}}
vals.EXPECT().Datapoints().Return(dp).AnyTimes()

tags := models.NewTags(1, models.NewTagOptions()).
AddTag(models.Tag{Name: []byte("a"), Value: []byte("b")})

valsTwo := ts.NewMockValues(ctrl)
valsTwo.EXPECT().Len().Return(1).AnyTimes()
dpTwo := ts.Datapoints{{Timestamp: now, Value: 2}}
valsTwo.EXPECT().Datapoints().Return(dpTwo).AnyTimes()
tagsTwo := models.NewTags(1, models.NewTagOptions()).
AddTag(models.Tag{Name: []byte("c"), Value: []byte("d")})

r := &storage.FetchResult{
SeriesList: ts.SeriesList{
ts.NewSeries([]byte("a"), vals, tags),
r := storage.PromResult{
PromResult: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Samples: []prompb.Sample{{Value: 1, Timestamp: promNow}},
Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}},
},
},
},
Metadata: block.ResultMetadata{
Exhaustive: true,
Expand All @@ -270,9 +259,14 @@ func TestMultipleRead(t *testing.T) {
},
}

rTwo := &storage.FetchResult{
SeriesList: ts.SeriesList{
ts.NewSeries([]byte("c"), valsTwo, tagsTwo),
rTwo := storage.PromResult{
PromResult: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Samples: []prompb.Sample{{Value: 2, Timestamp: promNow}},
Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}},
},
},
},
Metadata: block.ResultMetadata{
Exhaustive: false,
Expand All @@ -295,9 +289,11 @@ func TestMultipleRead(t *testing.T) {

engine := executor.NewMockEngine(ctrl)
engine.EXPECT().
Execute(gomock.Any(), q, gomock.Any(), gomock.Any()).Return(r, nil)
ExecuteProm(gomock.Any(), q, gomock.Any(), gomock.Any()).
Return(r, nil)
engine.EXPECT().
Execute(gomock.Any(), qTwo, gomock.Any(), gomock.Any()).Return(rTwo, nil)
ExecuteProm(gomock.Any(), qTwo, gomock.Any(), gomock.Any()).
Return(rTwo, nil)

h := NewPromReadHandler(engine, nil, nil, true, instrument.NewOptions()).(*PromReadHandler)
res, err := h.read(context.TODO(), nil, req, 0, storage.NewFetchOptions())
Expand Down Expand Up @@ -325,3 +321,61 @@ func TestMultipleRead(t *testing.T) {
require.Equal(t, 1, len(meta.Warnings))
assert.Equal(t, "foo_bar", meta.Warnings[0].Header())
}

func TestReadWithOptions(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

now := time.Now()
promNow := storage.TimeToPromTimestamp(now)

r := storage.PromResult{
PromResult: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Samples: []prompb.Sample{{Value: 1, Timestamp: promNow}},
Labels: []prompb.Label{
{Name: []byte("a"), Value: []byte("b")},
{Name: []byte("remove"), Value: []byte("c")},
},
},
},
},
Metadata: block.NewResultMetadata(),
}

req := &prompb.ReadRequest{
Queries: []*prompb.Query{{StartTimestampMs: 10}},
}

q, err := storage.PromReadQueryToM3(req.Queries[0])
require.NoError(t, err)

engine := executor.NewMockEngine(ctrl)
engine.EXPECT().
ExecuteProm(gomock.Any(), q, gomock.Any(), gomock.Any()).
Return(r, nil)

opts := storage.NewFetchOptions()
opts.RestrictQueryOptions = &storage.RestrictQueryOptions{
RestrictByTag: &storage.RestrictByTag{
Strip: [][]byte{[]byte("remove")},
},
}

h := NewPromReadHandler(engine, nil, nil, true,
instrument.NewOptions()).(*PromReadHandler)
res, err := h.read(context.TODO(), nil, req, 0, opts)
require.NoError(t, err)
expected := &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}},
Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}},
},
},
}

result := res.result
assert.Equal(t, expected.Timeseries[0], result[0].Timeseries[0])
}
Loading

0 comments on commit 8cbe5a8

Please sign in to comment.