Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Correctness checker #1993

Merged
merged 33 commits into from
Oct 28, 2019
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
242a821
[query] Comparator
arnikola Oct 10, 2019
93a336f
Updating docker files
arnikola Oct 10, 2019
c28c83a
Missed adding comparator.go
arnikola Oct 11, 2019
483c60a
WIP
arnikola Oct 16, 2019
91dbde0
Fix docker composition
arnikola Oct 16, 2019
9cfb7c8
Generating series
arnikola Oct 16, 2019
4106a4f
Added grafana to comparator
arnikola Oct 16, 2019
23d8c6b
WIP
arnikola Oct 17, 2019
b12e598
Merge branch 'master' into arnikola/comparator
arnikola Oct 17, 2019
f667c12
!!
arnikola Oct 17, 2019
9a80a8b
Fix this thing
arnikola Oct 18, 2019
a650ea8
Cleaning up code, aligning offsets.
arnikola Oct 18, 2019
b5bd97e
Tests passing
arnikola Oct 18, 2019
8fafe9c
Cleanup
arnikola Oct 18, 2019
554279b
Merge branch 'master' into arnikola/comparator
arnikola Oct 18, 2019
5674e5b
Fixing offset stuff
arnikola Oct 20, 2019
bf6cf4e
Adding comparator tests, making grafana local only
arnikola Oct 21, 2019
962c78a
Fix some tests, generated files should now be gitignored.
arnikola Oct 21, 2019
315689e
Fix query parsing
arnikola Oct 21, 2019
13ad1c8
sed awk etc
arnikola Oct 21, 2019
28ea7ea
Merge branch 'master' into arnikola/comparator
arnikola Oct 22, 2019
88f3e42
Fix some scripting, add more query types
arnikola Oct 23, 2019
9ad7e83
Merge branch 'arnikola/comparator' of github.com:m3db/m3 into arnikol…
arnikola Oct 23, 2019
75a5037
Fixing CI tests
arnikola Oct 23, 2019
da32a42
PR response
arnikola Oct 25, 2019
3461b3c
Fixes
arnikola Oct 25, 2019
f447464
Merge branch 'master' into arnikola/comparator
arnikola Oct 25, 2019
516c369
Merge branch 'master' into arnikola/comparator
arnikola Oct 25, 2019
0e665ba
PR response
arnikola Oct 27, 2019
2aa7798
PR response
arnikola Oct 27, 2019
1a170a9
Merge branch 'master' into arnikola/comparator
arnikola Oct 28, 2019
cfe6479
Increase tolerance to remove build breaks
arnikola Oct 28, 2019
c59ffdc
Merge branch 'arnikola/comparator' of github.com:m3db/m3 into arnikol…
arnikola Oct 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleaning up code, aligning offsets.
  • Loading branch information
arnikola committed Oct 18, 2019
commit a650ea86817810ef1f03ac2b1857294716e90f42
2 changes: 0 additions & 2 deletions scripts/comparator/run.sh
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@ set -xe
REVISION=$(git rev-parse HEAD)
COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/comparator/docker-compose.yml
export REVISION
docker-compose -f ${COMPOSE_FILE} stop m3comparator
docker-compose -f ${COMPOSE_FILE} stop m3query

echo "Run m3query, m3comparator, and prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d m3comparator
100 changes: 77 additions & 23 deletions src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
package main

import (
"bytes"
"context"
"fmt"
"math/rand"
@@ -40,21 +41,25 @@ var _ m3.Querier = (*querier)(nil)
type querier struct {
encoderPool encoding.EncoderPool
iteratorPools encoding.IteratorPools
sync.Once
sync.Mutex
}

func noop() error { return nil }

func buildDatapoints(
seed int64,
type seriesBlock []ts.Datapoint
type tagMap map[string]string
type series struct {
blocks []seriesBlock
tags tagMap
}

func generateSeriesBlock(
start time.Time,
blockSize time.Duration,
resolution time.Duration,
) []ts.Datapoint {
fmt.Println("Seeding with", seed)
rand.Seed(seed)
) seriesBlock {
numPoints := int(blockSize / resolution)
dps := make([]ts.Datapoint, 0, numPoints)
dps := make(seriesBlock, 0, numPoints)
for i := 0; i < numPoints; i++ {
dps = append(dps, ts.Datapoint{
Timestamp: start.Add(resolution * time.Duration(i)),
@@ -65,6 +70,26 @@ func buildDatapoints(
return dps
}

func generateSeries(
start time.Time,
end time.Time,
blockSize time.Duration,
resolution time.Duration,
tags tagMap,
) series {
numBlocks := int(float64(end.Sub(start)) / float64(blockSize))
blocks := make([]seriesBlock, 0, numBlocks)
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, generateSeriesBlock(start, blockSize, resolution))
start = start.Add(blockSize)
}

return series{
blocks: blocks,
tags: tags,
}
}

func (q *querier) buildOptions(
start time.Time,
blockSize time.Duration,
@@ -77,38 +102,67 @@ func (q *querier) buildOptions(
}
}

type seriesGen struct {
name string
res time.Duration
}

// FetchCompressed fetches timeseries data based on a query.
func (q *querier) FetchCompressed(
ctx context.Context,
query *storage.FetchQuery,
options *storage.FetchOptions,
) (m3.SeriesFetchResult, m3.Cleanup, error) {
var (
blockSize = time.Hour * 12
resolution = time.Minute

start = time.Now().Add(time.Hour * 24 * -7).Truncate(blockSize)
opts = q.buildOptions(start, blockSize)
tags = map[string]string{"__name__": "quail"}

dp = buildDatapoints(start.Unix(), start, blockSize, resolution)
dps = [][]ts.Datapoint{dp}
blockSize = time.Hour * 12
start = query.Start.Truncate(blockSize)
end = query.End.Truncate(blockSize).Add(blockSize)
opts = q.buildOptions(start, blockSize)

gens = []seriesGen{
seriesGen{"foo", time.Second * 15},
seriesGen{"bar", time.Minute * 5},
seriesGen{"quail", time.Minute},
}
)

q.Do(func() {
for i, p := range dp {
if i < 36 {
fmt.Println(i, "Added datapoint", p)
q.Lock()
rand.Seed(start.Unix())
arnikola marked this conversation as resolved.
Show resolved Hide resolved
for _, matcher := range query.TagMatchers {
// filter if name, otherwise return all.
if bytes.Equal([]byte("__name__"), matcher.Name) {
arnikola marked this conversation as resolved.
Show resolved Hide resolved
value := string(matcher.Value)
for _, gen := range gens {
if value == gen.name {
gens = []seriesGen{gen}
break
}
}

break
}
})
}

seriesList := make([]series, 0, len(gens))
for _, gen := range gens {
tagMap := map[string]string{
"__name__": gen.name,
"foobar": "qux",
"name": gen.name,
}

seriesList = append(
seriesList,
generateSeries(start, end, blockSize, gen.res, tagMap),
)
}

iter, err := buildIterator(dps, tags, opts)
q.Unlock()
iters, err := buildSeriesIterators(seriesList, opts)
if err != nil {
return m3.SeriesFetchResult{}, noop, err
}

iters := encoding.NewSeriesIterators([]encoding.SeriesIterator{iter}, nil)
cleanup := func() error {
iters.Close()
return nil
122 changes: 76 additions & 46 deletions src/cmd/services/m3comparator/main/series_iterator_builder.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
@@ -48,50 +47,68 @@ var iterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterat
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}

func buildIterator(
dps [][]ts.Datapoint,
tagMap map[string]string,
func buildBlockReader(
block seriesBlock,
start time.Time,
opts iteratorOptions,
) (encoding.SeriesIterator, error) {
var (
encoders = make([]encoding.Encoder, 0, len(dps))
readers = make([][]xio.BlockReader, 0, len(dps))
) ([]xio.BlockReader, error) {
encoder := opts.encoderPool.Get()
encoder.Reset(start, len(block), nil)
for _, dp := range block {
err := encoder.Encode(dp, xtime.Second, nil)
if err != nil {
encoder.Close()
return nil, err
}
}

segment := encoder.Discard()
encoder.Close()
arnikola marked this conversation as resolved.
Show resolved Hide resolved
return []xio.BlockReader{
xio.BlockReader{
SegmentReader: xio.NewSegmentReader(segment),
Start: start,
BlockSize: opts.blockSize,
},
}, nil
}

func buildTagIteratorAndID(tagMap tagMap) (ident.TagIterator, ident.ID) {
var (
tags = ident.Tags{}
sb strings.Builder
arnikola marked this conversation as resolved.
Show resolved Hide resolved
)

for range dps {
encoders = append(encoders, opts.encoderPool.Get())
for name, value := range tagMap {
arnikola marked this conversation as resolved.
Show resolved Hide resolved
sb.WriteString(name)
sb.WriteRune(sep)
sb.WriteString(value)
sb.WriteRune(tagSep)
tags.Append(ident.StringTag(name, value))
}

defer func() {
for _, e := range encoders {
if e != nil {
e.Close()
}
}
}()

// Build a merged BlockReader
for i, datapoints := range dps {
encoder := encoders[i]
encoder.Reset(opts.start, len(datapoints), nil)
if len(datapoints) > 0 {
for _, dp := range datapoints {
err := encoder.Encode(dp, xtime.Second, nil)
if err != nil {
return nil, err
}
}

segment := encoder.Discard()
readers = append(readers, []xio.BlockReader{{
SegmentReader: xio.NewSegmentReader(segment),
Start: opts.start,
BlockSize: opts.blockSize,
}})
return ident.NewTagsIterator(tags), ident.StringID(sb.String())
}

func buildSeriesIterator(
series series,
opts iteratorOptions,
) (encoding.SeriesIterator, error) {
var (
blocks = series.blocks
tags = series.tags
readers = make([][]xio.BlockReader, 0, len(blocks))
start = opts.start
)

for _, block := range blocks {
seriesBlock, err := buildBlockReader(block, start, opts)
if err != nil {
return nil, err
}

readers = append(readers, seriesBlock)
start = start.Add(opts.blockSize)
}

multiReader := encoding.NewMultiReaderIterator(
@@ -102,19 +119,12 @@ func buildIterator(
sliceOfSlicesIter := xio.NewReaderSliceOfSlicesFromBlockReadersIterator(readers)
multiReader.ResetSliceOfSlices(sliceOfSlicesIter, nil)

for name, value := range tagMap {
sb.WriteString(name)
sb.WriteRune(sep)
sb.WriteString(value)
sb.WriteRune(tagSep)
tags.Append(ident.StringTag(name, value))
}

tagIter, id := buildTagIteratorAndID(tags)
return encoding.NewSeriesIterator(
encoding.SeriesIteratorOptions{
ID: ident.StringID(sb.String()),
ID: id,
Namespace: ident.StringID("ns"),
Tags: ident.NewTagsIterator(tags),
Tags: tagIter,
StartInclusive: opts.start,
EndExclusive: opts.start.Add(opts.blockSize),
Replicas: []encoding.MultiReaderIterator{
@@ -123,3 +133,23 @@ func buildIterator(
}, nil),
nil
}

func buildSeriesIterators(
series []series,
opts iteratorOptions,
) (encoding.SeriesIterators, error) {
iters := make([]encoding.SeriesIterator, 0, len(series))
for _, s := range series {
iter, err := buildSeriesIterator(s, opts)
if err != nil {
return nil, err
}

iters = append(iters, iter)
}

return encoding.NewSeriesIterators(
iters,
opts.iteratorPools.MutableSeriesIterators(),
), nil
}
1 change: 0 additions & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
@@ -316,7 +316,6 @@ func renderResultsJSON(
// NB(r): Removing the optimization of computing startIdx once just in case our assumptions are wrong,
// we can always add this optimization back later. Without this code I see datapoints more often.
if dp.Timestamp.Before(params.Start) {
fmt.Println("Skipping datapoint", dp.Value, "at", dp.Timestamp.Format("3:04:05PM"), "(((", dp, ")))")
continue
}

8 changes: 6 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
@@ -184,7 +184,8 @@ func (h *PromReadHandler) ServeHTTPWithEngine(
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx, h.instrumentOpts)

params, rErr := parseParams(r, engine.Options(), h.timeoutOps, fetchOpts, h.instrumentOpts)
params, rErr := parseParams(r, engine.Options(),
h.timeoutOps, fetchOpts, h.instrumentOpts)
if rErr != nil {
h.promReadMetrics.fetchErrorsClient.Inc(1)
return nil, emptyReqParams, &RespError{Err: rErr.Inner(), Code: rErr.Code()}
@@ -207,7 +208,10 @@ func (h *PromReadHandler) ServeHTTPWithEngine(
opentracingext.Error.Set(sp, true)
logger.Error("unable to fetch data", zap.Error(err))
h.promReadMetrics.fetchErrorsServer.Inc(1)
return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusInternalServerError}
return nil, emptyReqParams, &RespError{
Err: err,
Code: http.StatusInternalServerError,
}
}

// TODO: Support multiple result types
Loading