diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 786e4935a1..86808613ed 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -52,6 +52,16 @@ steps: gopath-checkout#v1.0.1: import: github.com/m3db/m3 <<: *common + - name: "Prometheus compatibility (:docker:)" + command: make clean install-vendor docker-compatibility-test + parallelism: 1 + env: + CGO_ENABLED: 0 + GIMME_GO_VERSION: 1.12.x + plugins: + gopath-checkout#v1.0.1: + import: github.com/m3db/m3 + <<: *common - name: "Integration (dbnode Recently Read) %n" parallelism: 2 command: make clean install-vendor test-ci-integration-dbnode cache_policy=recently_read diff --git a/Makefile b/Makefile index 56b011c2b7..1349c7c693 100644 --- a/Makefile +++ b/Makefile @@ -63,6 +63,7 @@ SERVICES := \ m3em_agent \ m3nsch_server \ m3nsch_client \ + m3comparator \ SUBDIRS := \ x \ @@ -233,6 +234,12 @@ docker-integration-test: @echo "--- Running Docker integration test" ./scripts/docker-integration-tests/run.sh + +.PHONY: docker-compatibility-test +docker-compatibility-test: + @echo "--- Running Prometheus compatibility test" + ./scripts/comparator/run.sh + .PHONY: site-build site-build: @echo "Building site" diff --git a/scripts/comparator/README.md b/scripts/comparator/README.md new file mode 100644 index 0000000000..d086bbd1fb --- /dev/null +++ b/scripts/comparator/README.md @@ -0,0 +1,21 @@ +# Query comparator + +This docker-compose file will setup the following environment: + +1. 1 M3Comparator node that acts as remote gRPC storage. Provides randomized data based on the incoming query's start time. +2. 1 M3Query node that connects to the M3Comparator instance, using it as remote storage. Serves queries and remote reads. +3. 1 Prometheus node that has no scrape settings set, connecting to M3Query instance as a remote_read endpoint. +4. (optionally) 1 Grafana node with pre-configured graphs corresponding to the queries run by the test. + +## Mechanism + +- Queries are generated from `queries.json`, then run against both the Prometheus and M3Query instances, then results are compared. + +## Usage + +- Use `make docker-compatibility-test` from the base folder to run the comparator tests. +- Use `CI=FALSE make docker-compatibility-test` from the base folder to run the comparator tests, brings up a Grafana instance and does not perform teardown, allowing manual inspection of query differences. + +## Grafana + +Use Grafana by navigating to `http://localhost:3000` and using `admin` for both the username and password. The dashboard should already be populated and working, it should be named `Dashboard `. diff --git a/scripts/comparator/compare.go b/scripts/comparator/compare.go new file mode 100644 index 0000000000..9a96c6bc35 --- /dev/null +++ b/scripts/comparator/compare.go @@ -0,0 +1,194 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + "github.com/m3db/m3/scripts/comparator/utils" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + + "go.uber.org/zap" +) + +func paramError(err string, log *zap.Logger) { + log.Error(err) + flag.Usage() +} + +func main() { + var ( + iOpts = instrument.NewOptions() + log = iOpts.Logger() + + now = time.Now() + + pQueryFile = flag.String("input", "", "the query file") + pPromAddress = flag.String("promAdress", "0.0.0.0:9090", "prom address") + pQueryAddress = flag.String("queryAddress", "0.0.0.0:7201", "query address") + + pStart = flag.Int64("s", now.Add(time.Hour*-3).Unix(), "start time") + pEnd = flag.Int64("e", now.Unix(), "start end") + ) + + flag.Parse() + var ( + queryFile = *pQueryFile + promAddress = *pPromAddress + queryAddress = *pQueryAddress + + start = *pStart + end = *pEnd + ) + + if len(queryFile) == 0 { + paramError("No query found", log) + os.Exit(1) + } + + if len(promAddress) == 0 { + paramError("No prom address found", log) + os.Exit(1) + } + + if len(queryAddress) == 0 { + paramError("No query server address found", log) + os.Exit(1) + } + + if end < start { + paramError(fmt.Sprintf("start(%d) is before end (%d)", start, end), log) + os.Exit(1) + } + + queries, err := utils.ParseFileToPromQLQueryGroup(queryFile, start, end, log) + if err != nil { + log.Error("could not parse file to PromQL queries", zap.Error(err)) + os.Exit(1) + } + + var multiErr xerrors.MultiError + for _, queryGroup := range queries { + if err := runQueryGroup( + queryGroup, + promAddress, + queryAddress, + log, + ); err != nil { + multiErr = multiErr.Add(err) + } + } + + if multiErr.LastError() != nil { + log.Error("mismatched queries detected") + os.Exit(1) + } +} + +func runQueryGroup( + queryGroup utils.PromQLQueryGroup, + promAddress string, + queryAddress string, + log *zap.Logger, +) error { + log.Info("running query group", zap.String("group", queryGroup.QueryGroup)) + + var multiErr xerrors.MultiError + for _, query := range queryGroup.Queries { + promURL := fmt.Sprintf("http://%s%s", promAddress, query) + queryURL := fmt.Sprintf("http://%s%s", queryAddress, query) + if err := runComparison(promURL, queryURL, log); err != nil { + multiErr = multiErr.Add(err) + log.Error( + "mismatched query", + zap.String("promURL", promURL), + zap.String("queryURL", queryURL), + ) + } + } + + return multiErr.FinalError() +} + +func runComparison( + promURL string, + queryURL string, + log *zap.Logger, +) error { + promResult, err := parseResult(promURL, log) + if err != nil { + log.Error("failed to parse Prometheus result", zap.Error(err)) + return err + } + + queryResult, err := parseResult(queryURL, log) + if err != nil { + log.Error("failed to parse M3Query result", zap.Error(err)) + return err + } + + _, err = promResult.Matches(queryResult) + if err != nil { + log.Error("mismatch", zap.Error((err))) + return err + } + + return nil +} + +func parseResult( + endpoint string, + log *zap.Logger, +) (prometheus.Response, error) { + var result prometheus.Response + response, err := http.Get(endpoint) + if err != nil { + return result, err + } + + if response.StatusCode != http.StatusOK { + return result, fmt.Errorf("response failed with code %s", response.Status) + } + + body := response.Body + defer func() { + body.Close() + }() + + data, err := ioutil.ReadAll(body) + if err != nil { + return result, err + } + + if err = json.Unmarshal(data, &result); err != nil { + return result, err + } + + return result, nil +} diff --git a/scripts/comparator/docker-compose.yml b/scripts/comparator/docker-compose.yml new file mode 100755 index 0000000000..4ad11c9c2d --- /dev/null +++ b/scripts/comparator/docker-compose.yml @@ -0,0 +1,45 @@ +version: "3.5" +services: + m3comparator: + expose: + - "9000" + ports: + - "0.0.0.0:9000:9000" + networks: + - backend + image: "m3comparator:${REVISION}" + m3query: + expose: + - "7201" + - "7203" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + networks: + - backend + image: "m3query:${REVISION}" + volumes: + - "./m3query.yml:/etc/m3query/m3query.yml" + prometheus: + expose: + - "9090" + ports: + - "0.0.0.0:9090:9090" + networks: + - backend + image: prom/prometheus:latest + volumes: + - "./:/etc/prometheus/" + grafana: + build: + context: ./grafana + dockerfile: grafana.Dockerfile + expose: + - "3000" + ports: + - "0.0.0.0:3000:3000" + networks: + - backend + image: m3grafana:latest +networks: + backend: diff --git a/scripts/comparator/docker-setup.sh b/scripts/comparator/docker-setup.sh new file mode 100755 index 0000000000..f61c9db922 --- /dev/null +++ b/scripts/comparator/docker-setup.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +set -xe + +REVISION=$(git rev-parse HEAD) +COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/comparator/docker-compose.yml +export REVISION + +function setup_docker { + echo "Run m3query, m3comparator, and prometheus containers" + docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3comparator + docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes prometheus + docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3query + + CI=$1 + if [[ "$CI" != "true" ]] + then + echo "run grafana container" + docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes grafana + fi +} + +function teardown_docker { + CI=$1 + # CI fails to stop all containers sometimes + if [[ "$CI" == "true" ]] + then + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" + fi +} diff --git a/scripts/comparator/grafana/dashboard.tmpl b/scripts/comparator/grafana/dashboard.tmpl new file mode 100755 index 0000000000..dc33216a65 --- /dev/null +++ b/scripts/comparator/grafana/dashboard.tmpl @@ -0,0 +1,165 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + {{range $i, $v := .Queries}}{{if $i}},{{end}} + { + "collapsed": true, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": {{$v.Index}}, + "panels": [{{range $idx, $q := $v.Queries }}{{with $q}}{{if $idx}},{{end}} + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "-- Mixed --", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": {{if .Left}}0{{else}}12{{end}}, + "y": 0 + }, + "id": {{.Index}}, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "datasource": "M3Query", + "expr": "{{.Query}}", + "hide": false, + "interval": "{{.Interval}}", + "legendFormat": "m3", + "refId": "A" + }, + { + "datasource": "Prometheus", + "expr": "{{.Query}}", + "hide": false, + "interval": "{{.Interval}}", + "legendFormat": "prom", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "{{.Query}} : {{.Interval}}", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }{{end}}{{end}} + ], + "title": "{{$v.QueryGroup}}", + "type": "row" + } + {{end}} + ], + "refresh": false, + "schemaVersion": 20, + "style": "dark", + "tags": ["queries"], + "templating": { + "list": [] + }, + "time": { + "from": "{{.Start}}", + "to": "{{.End}}" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ] + }, + "timezone": "", + "title": "Dashboard {{.Revision}}", + "uid": "{{.Revision}}", + "version": 1 +} diff --git a/scripts/comparator/grafana/dashboards.yaml b/scripts/comparator/grafana/dashboards.yaml new file mode 100644 index 0000000000..b84572d063 --- /dev/null +++ b/scripts/comparator/grafana/dashboards.yaml @@ -0,0 +1,6 @@ +- name: 'default' + org_id: 1 + folder: '' + type: 'file' + options: + folder: '/tmp/grafana_dashboards' diff --git a/scripts/comparator/grafana/datasource.yaml b/scripts/comparator/grafana/datasource.yaml new file mode 100755 index 0000000000..0d571a18e1 --- /dev/null +++ b/scripts/comparator/grafana/datasource.yaml @@ -0,0 +1,9 @@ +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + - name: M3Query + type: prometheus + access: proxy + url: http://m3query:7201 diff --git a/scripts/comparator/grafana/generate_dash.go b/scripts/comparator/grafana/generate_dash.go new file mode 100644 index 0000000000..0079e4709a --- /dev/null +++ b/scripts/comparator/grafana/generate_dash.go @@ -0,0 +1,121 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "flag" + "os" + "text/template" + "time" + + "github.com/m3db/m3/scripts/comparator/utils" + "github.com/m3db/m3/src/x/instrument" + + "go.uber.org/zap" +) + +// TemplateData is a collection of template data. +type TemplateData struct { + Revision string + Start string + End string + Queries []utils.GrafanaQueries +} + +func paramError(err string, log *zap.Logger) { + log.Error(err) + flag.Usage() +} + +func main() { + var ( + iOpts = instrument.NewOptions() + log = iOpts.Logger() + + pRevision = flag.String("r", "", "the git revision") + pQueryFile = flag.String("q", "", "the query file") + pTemplate = flag.String("t", "", "the template file") + pOutput = flag.String("o", "", "the output file") + + pStart = flag.Int64("s", time.Now().Unix(), "start") + pEnd = flag.Int64("e", time.Now().Unix(), "end") + ) + + flag.Parse() + var ( + revision = *pRevision + qFile = *pQueryFile + output = *pOutput + templateFile = *pTemplate + ) + + if len(revision) == 0 { + paramError("No revision found", log) + os.Exit(1) + } + + if len(qFile) == 0 { + paramError("No query file found", log) + os.Exit(1) + } + + if len(output) == 0 { + paramError("No output found", log) + os.Exit(1) + } + + if len(templateFile) == 0 { + paramError("No template file found", log) + os.Exit(1) + } + + queries, err := utils.ParseFileToGrafanaQueries(qFile, log) + if err != nil { + log.Error("could not parse file to Grafana queries", zap.Error(err)) + os.Exit(1) + } + + opts := os.O_RDWR | os.O_CREATE | os.O_TRUNC + outputFile, err := os.OpenFile(output, opts, 0777) + if err != nil { + log.Error("could not open output file", zap.Error(err)) + os.Exit(1) + } + + defer outputFile.Close() + + start := time.Unix(*pStart, 0) + end := time.Unix(*pEnd, 0) + + templateData := TemplateData{ + Revision: revision, + Queries: queries, + Start: start.Format(time.RFC3339), + End: end.Format(time.RFC3339), + } + + t := template.Must(template.ParseFiles(templateFile)) + err = t.Execute(outputFile, templateData) + if err != nil { + log.Error("could not write to output file", zap.Error(err)) + os.Exit(1) + } +} diff --git a/scripts/comparator/grafana/grafana.Dockerfile b/scripts/comparator/grafana/grafana.Dockerfile new file mode 100755 index 0000000000..a926ad5c6e --- /dev/null +++ b/scripts/comparator/grafana/grafana.Dockerfile @@ -0,0 +1,7 @@ +FROM grafana/grafana:latest + +COPY ./datasource.yaml /etc/grafana/provisioning/datasources/datasource.yaml +COPY ./dashboards.yaml /etc/grafana/provisioning/dashboards/all.yaml + +RUN mkdir -p /tmp/grafana_dashboards +COPY ./dash.json.out /tmp/grafana_dashboards/dashboard.json diff --git a/scripts/comparator/m3comparator.Dockerfile b/scripts/comparator/m3comparator.Dockerfile new file mode 100755 index 0000000000..d71483bb43 --- /dev/null +++ b/scripts/comparator/m3comparator.Dockerfile @@ -0,0 +1,11 @@ +FROM alpine:latest AS builder +LABEL maintainer="The M3DB Authors " + +RUN mkdir -p /bin +RUN mkdir -p /etc/m3comparator +ADD ./m3comparator /bin/ + +EXPOSE 9000/tcp + +ENTRYPOINT [ "/bin/m3comparator" ] +CMD diff --git a/scripts/comparator/m3query.Dockerfile b/scripts/comparator/m3query.Dockerfile new file mode 100755 index 0000000000..081bfc1df0 --- /dev/null +++ b/scripts/comparator/m3query.Dockerfile @@ -0,0 +1,12 @@ +FROM alpine:latest AS builder +LABEL maintainer="The M3DB Authors " + +RUN mkdir -p /bin +RUN mkdir -p /etc/m3query +ADD ./m3query /bin/ +ADD ./m3query-dev-remote.yml /etc/m3query/m3query.yml + +EXPOSE 7201/tcp 7203/tcp + +ENTRYPOINT [ "/bin/m3query" ] +CMD [ "-f", "/etc/m3query/m3query.yml" ] diff --git a/scripts/comparator/m3query.yml b/scripts/comparator/m3query.yml new file mode 100644 index 0000000000..cc5f13fb1e --- /dev/null +++ b/scripts/comparator/m3query.yml @@ -0,0 +1,23 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +backend: grpc + +rpc: + remotes: + - name: "remote" + remoteListenAddresses: ["m3comparator:9000"] + +metrics: + scope: + prefix: "query" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +tagOptions: + idScheme: quoted diff --git a/scripts/comparator/prometheus.yml b/scripts/comparator/prometheus.yml new file mode 100644 index 0000000000..b7903ea850 --- /dev/null +++ b/scripts/comparator/prometheus.yml @@ -0,0 +1,9 @@ +global: + external_labels: + role: "remote" + scrape_interval: 15s + evaluation_interval: 15s + +remote_read: + - url: http://m3query:7201/api/v1/prom/remote/read + read_recent: true diff --git a/scripts/comparator/queries.json b/scripts/comparator/queries.json new file mode 100644 index 0000000000..4c329bfb3c --- /dev/null +++ b/scripts/comparator/queries.json @@ -0,0 +1,84 @@ +[ + { + "queryGroup":"fetch", + "queries":[ + "quail", + "quail offset 60s" + ], + "steps" : [ + "15s", + "30s", + "1m" + ] + }, + { + "queryGroup":"temporal", + "queries":[ + "rate(quail[1m])", + "irate(quail[5m])", + "delta(quail[123s])", + "idelta(quail[1m] offset 5m)", + "deriv(quail[5m])" + ], + "steps" : [ + "15s", + "30s", + "1m" + ] + }, + { + "queryGroup":"binary", + "queries":[ + "quail*1", + "1-quail", + "quail*quail", + "quail offset 1m / quail" + ], + "steps" : [ + "15s", + "30s", + "1m" + ] + }, + { + "queryGroup":"aggregation", + "queries":[ + "sum({foobar=\"qux\"})", + "sum({foobar=\"qux\"}) - 1", + "sum({foobar=\"qux\"} offset 1m)" + ], + "steps" : [ + "15s", + "30s", + "1m" + ] + }, + { + "queryGroup":"transform", + "queries":[ + "clamp_max(quail, 0.5)", + "clamp_min(quail offset 60s, 0.5)", + "sum({foobar=\"qux\"}) - 1", + "sum({foobar=\"qux\"} offset 1m)" + ], + "steps" : [ + "15s", + "30s", + "1m" + ] + }, + { + "queryGroup":"label", + "queries":[ + "label_replace(quail,\"foo\", \"$1!\", \"name\", \"(.*)\")", + "label_replace(quail offset 1m,\"foo\", \"$1!\", \"name\", \"(.*)\")", + "label_replace(quail,\"foo\", \"$1!\", \"name\", \"(.*)\")-100", + "label_join(quail,\"quince\", \"!\", \"foobar\", \"name\")" + ], + "steps" : [ + "15s", + "30s", + "1m" + ] + } +] diff --git a/scripts/comparator/run.sh b/scripts/comparator/run.sh new file mode 100755 index 0000000000..63e0327ae8 --- /dev/null +++ b/scripts/comparator/run.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash + +set -ex +export COMPARATOR=$GOPATH/src/github.com/m3db/m3/scripts/comparator +source $COMPARATOR/docker-setup.sh + +export REVISION=$(git rev-parse HEAD) + +CI=${CI:-true} +RUN_ONLY=${RUN_ONLY:-false} + +export QUERY_FILE=$COMPARATOR/queries.json +export GRAFANA_PATH=$COMPARATOR/grafana +export DASHBOARD=$GRAFANA_PATH/dash.json.out + +export END=${END:-$(date +%s)} +export START=${START:-$(( $END - 10800 ))} + +function generate_dash { + TEMPLATE=$GRAFANA_PATH/dashboard.tmpl + + GENERATOR=$GRAFANA_PATH/generate_dash.go + + go run $GENERATOR \ + -r=$REVISION \ + -q=$QUERY_FILE \ + -o=$DASHBOARD \ + -t=$TEMPLATE \ + -s=$START \ + -e=$END +} + +if [[ "$RUN_ONLY" == "false" ]] +then + if [[ ! "$CI" == "true" ]] + then + echo "generating grafana dashboard" + generate_dash + fi + + echo "setting up containers" + $COMPARATOR/setup.sh + + echo "setting up docker" + setup_docker $CI +fi + +comparator=$COMPARATOR/compare.out +go build -o $comparator $COMPARATOR/. +function defer { + rm $comparator + if [[ "$CI" == "true" ]] + then + teardown_docker $CI + else + if [[ "$RUN_ONLY" == "false" ]] + then + rm $DASHBOARD + fi + fi +} + +if [[ "$RUN_ONLY" == "false" ]] +then + trap defer EXIT +fi + +$comparator -input=$QUERY_FILE -s=$START -e=$END diff --git a/scripts/comparator/setup.sh b/scripts/comparator/setup.sh new file mode 100755 index 0000000000..3d589c82e0 --- /dev/null +++ b/scripts/comparator/setup.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash + +set -xe + +# expected to be run from root of repository +cd $GOPATH/src/github.com/m3db/m3 + +REVISION=$(git rev-parse HEAD) +CLEAN=${CLEAN:-false} +REBUILD=${REBUILD:-true} +if [[ "$CLEAN" == "true" ]]; then + make clean +fi +mkdir -p ./bin + +# by keeping all the required files in ./bin, it makes the build context +# for docker much smaller +cp ./src/query/config/m3query-dev-remote.yml ./bin + +SERVICES=(m3comparator m3query) +# build images +echo "building docker images" +function build_image { + local svc=$1 + echo "creating image for $svc" + make ${svc}-linux-amd64 + docker build -t "${svc}:${REVISION}" -f ./scripts/comparator/${svc}.Dockerfile ./bin +} + +if [[ "$SERVICE" != "" ]]; then + # optionally build just for a single service + build_image $SERVICE +else + # otherwise build all images + for SVC in ${SERVICES[@]}; do + # only build if image doesn't exist + if [[ "$(docker images -q ${SVC}_integration:${REVISION} 2> /dev/null)" == "" ]]; then + build_image $SVC + else + if [[ "$REBUILD" == "true" ]]; then + build_image $SVC + fi + fi + done +fi diff --git a/scripts/comparator/utils/compare_utilities.go b/scripts/comparator/utils/compare_utilities.go new file mode 100644 index 0000000000..0b597b230a --- /dev/null +++ b/scripts/comparator/utils/compare_utilities.go @@ -0,0 +1,207 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package utils + +import ( + "encoding/json" + "io/ioutil" + "net/url" + "os" + "strconv" + "strings" + + "go.uber.org/zap" +) + +// InputQueries is a slice of InputQuery. +type InputQueries []InputQuery + +// InputQuery is the JSON representation of a query to be compared. +type InputQuery struct { + // QueryGroup is the general category for these queries. + QueryGroup string `json:"queryGroup"` + // Queries is the list of raw queries. + Queries []string `json:"queries"` + // Steps is the list of step sizes for these queries. + Steps []string `json:"steps"` +} + +// PromQLQueryGroup is a list of constructed PromQL query groups. +type PromQLQueryGroup struct { + // QueryGroup is the general category for these queries. + QueryGroup string + // Queries is a list of PromQL compatible queries. + Queries []string +} + +func (q InputQueries) constructPromQL( + start int64, + end int64, +) []PromQLQueryGroup { + queries := make([]PromQLQueryGroup, 0, len(q)) + for _, inQuery := range q { + queries = append(queries, inQuery.constructPromQL(start, end)) + } + + return queries +} + +func (q InputQuery) constructPromQL(start int64, end int64) PromQLQueryGroup { + queries := make([]string, 0, len(q.Queries)*len(q.Steps)) + for _, inQuery := range q.Queries { + for _, inStep := range q.Steps { + values := make(url.Values) + values.Add("query", inQuery) + values.Add("step", inStep) + values.Add("start", strconv.Itoa(int(start))) + values.Add("end", strconv.Itoa(int(end))) + query := "/api/v1/query_range?" + values.Encode() + + queries = append(queries, query) + } + } + + return PromQLQueryGroup{ + QueryGroup: q.QueryGroup, + Queries: queries, + } +} + +func parseFileToQueries( + fileName string, + log *zap.Logger, +) (InputQueries, error) { + file, err := os.Open(fileName) + if err != nil { + log.Error("could not open file", zap.Error(err)) + return nil, err + } + + defer file.Close() + buf, err := ioutil.ReadAll(file) + if err != nil { + log.Error("could not read file", zap.Error(err)) + return nil, err + } + + queries := make(InputQueries, 0, 10) + if err := json.Unmarshal(buf, &queries); err != nil { + log.Error("could not unmarhsal queries", zap.Error(err)) + return nil, err + } + + return queries, err +} + +// ParseFileToPromQLQueryGroup parses a JSON queries file +// into PromQL query groups. +func ParseFileToPromQLQueryGroup( + fileName string, + start int64, + end int64, + log *zap.Logger, +) ([]PromQLQueryGroup, error) { + queries, err := parseFileToQueries(fileName, log) + if err != nil { + return nil, err + } + + return queries.constructPromQL(start, end), nil +} + +// GrafanaQueries is a list of Grafana dashboard compatible queries. +type GrafanaQueries struct { + // QueryGroup is the general category for these queries. + QueryGroup string + // Queries is a list of Grafana dashboard compatible queries. + Queries []GrafanaQuery + // Index is this query group's index. + Index int +} + +// GrafanaQuery is a Grafana dashboard compatible query. +type GrafanaQuery struct { + // Query is the query. + Query string + // Interval is the step size. + Interval string + // Index is this query's index. + Index int + // Left indicates if this panel is on the left. + Left bool +} + +// constructGrafanaQueries constructs a list of Grafana dashboard compatible +// queries. +func (q InputQueries) constructGrafanaQueries() []GrafanaQueries { + queries := make([]GrafanaQueries, 0, len(q)) + idx := 0 + for _, inQuery := range q { + query, index := inQuery.constructGrafanaQuery(idx) + idx = index + // NB: don't add empty queries if they exist for whatever reason. + if len(query.Queries) > 0 { + queries = append(queries, query) + } + } + + return queries +} + +func (q InputQuery) constructGrafanaQuery(idx int) (GrafanaQueries, int) { + grafanaQueries := GrafanaQueries{ + QueryGroup: q.QueryGroup, + Index: idx, + } + + queries := make([]GrafanaQuery, 0, len(q.Queries)*len(q.Steps)) + left := true + for _, inQuery := range q.Queries { + for _, inStep := range q.Steps { + idx++ + queries = append(queries, GrafanaQuery{ + Query: strings.ReplaceAll(inQuery, `"`, `\"`), + Interval: inStep, + Index: idx, + Left: left, + }) + + left = !left + } + } + + grafanaQueries.Queries = queries + return grafanaQueries, idx + 1 +} + +// ParseFileToGrafanaQueries parses a JSON queries file into Grafana dashboard +// compatible queries. +func ParseFileToGrafanaQueries( + fileName string, + log *zap.Logger, +) ([]GrafanaQueries, error) { + queries, err := parseFileToQueries(fileName, log) + if err != nil { + return nil, err + } + + return queries.constructGrafanaQueries(), nil +} diff --git a/src/cmd/services/m3comparator/main/main.go b/src/cmd/services/m3comparator/main/main.go new file mode 100644 index 0000000000..4f220c1290 --- /dev/null +++ b/src/cmd/services/m3comparator/main/main.go @@ -0,0 +1,77 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "net" + "time" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/pools" + "github.com/m3db/m3/src/query/tsdb/remote" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/pool" + + "go.uber.org/zap" +) + +func main() { + var ( + iterPools = pools.BuildIteratorPools() + poolWrapper = pools.NewPoolsWrapper(iterPools) + + iOpts = instrument.NewOptions() + logger = iOpts.Logger() + + encoderPoolOpts = pool.NewObjectPoolOptions() + encoderPool = encoding.NewEncoderPool(encoderPoolOpts) + ) + + encoderPool.Init(func() encoding.Encoder { + return m3tsz.NewEncoder(time.Now(), nil, true, encoding.NewOptions()) + }) + + querier := &querier{ + encoderPool: encoderPool, + iteratorPools: iterPools, + } + + server := remote.NewGRPCServer( + querier, + models.QueryContextOptions{}, + poolWrapper, + iOpts, + ) + + addr := "0.0.0.0:9000" + logger.Info("starting remote server", zap.String("address", addr)) + listener, err := net.Listen("tcp", addr) + if err != nil { + logger.Error("listener error", zap.Error(err)) + return + } + + if err := server.Serve(listener); err != nil { + logger.Error("serve error", zap.Error(err)) + } +} diff --git a/src/cmd/services/m3comparator/main/querier.go b/src/cmd/services/m3comparator/main/querier.go new file mode 100644 index 0000000000..45a2ca5469 --- /dev/null +++ b/src/cmd/services/m3comparator/main/querier.go @@ -0,0 +1,223 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "bytes" + "context" + "fmt" + "math" + "math/rand" + "sync" + "time" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/storage/m3" +) + +var _ m3.Querier = (*querier)(nil) + +type querier struct { + encoderPool encoding.EncoderPool + iteratorPools encoding.IteratorPools + sync.Mutex +} + +func noop() error { return nil } + +type seriesBlock []ts.Datapoint +type tagMap map[string]string +type series struct { + blocks []seriesBlock + tags tagMap +} + +func generateSeriesBlock( + start time.Time, + blockSize time.Duration, + resolution time.Duration, +) seriesBlock { + numPoints := int(blockSize / resolution) + dps := make(seriesBlock, 0, numPoints) + for i := 0; i < numPoints; i++ { + dp := ts.Datapoint{ + Timestamp: start.Add(resolution * time.Duration(i)), + Value: rand.Float64(), + } + + dps = append(dps, dp) + } + + return dps +} + +func generateSeries( + start time.Time, + end time.Time, + blockSize time.Duration, + resolution time.Duration, + tags tagMap, +) (series, error) { + numBlocks := int(math.Ceil(float64(end.Sub(start)) / float64(blockSize))) + if numBlocks == 0 { + return series{}, fmt.Errorf("comparator querier: no blocks generated") + } + + blocks := make([]seriesBlock, 0, numBlocks) + for i := 0; i < numBlocks; i++ { + blocks = append(blocks, generateSeriesBlock(start, blockSize, resolution)) + start = start.Add(blockSize) + } + + return series{ + blocks: blocks, + tags: tags, + }, nil +} + +func (q *querier) generateOptions( + start time.Time, + blockSize time.Duration, + tagOptions models.TagOptions, +) iteratorOptions { + return iteratorOptions{ + start: start, + blockSize: blockSize, + encoderPool: q.encoderPool, + iteratorPools: q.iteratorPools, + tagOptions: tagOptions, + } +} + +type seriesGen struct { + name string + res time.Duration +} + +// FetchCompressed fetches timeseries data based on a query. +func (q *querier) FetchCompressed( + ctx context.Context, + query *storage.FetchQuery, + options *storage.FetchOptions, +) (m3.SeriesFetchResult, m3.Cleanup, error) { + var ( + // TODO: take from config. + blockSize = time.Hour * 12 + start = query.Start.Truncate(blockSize) + end = query.End.Truncate(blockSize).Add(blockSize) + tagOpts = models.NewTagOptions() + opts = q.generateOptions(start, blockSize, tagOpts) + + // TODO: take from config. + gens = []seriesGen{ + seriesGen{"foo", time.Second}, + seriesGen{"bar", time.Second * 15}, + seriesGen{"quail", time.Minute}, + } + + actualGens []seriesGen + ) + + q.Lock() + defer q.Unlock() + rand.Seed(start.Unix()) + for _, matcher := range query.TagMatchers { + // filter if name, otherwise return all. + if bytes.Equal(opts.tagOptions.MetricName(), matcher.Name) { + value := string(matcher.Value) + for _, gen := range gens { + if value == gen.name { + actualGens = append(actualGens, gen) + break + } + } + + break + } + } + + if len(actualGens) == 0 { + actualGens = gens + } + + seriesList := make([]series, 0, len(actualGens)) + for _, gen := range actualGens { + tagMap := map[string]string{ + "__name__": gen.name, + "foobar": "qux", + "name": gen.name, + } + + series, err := generateSeries(start, end, blockSize, gen.res, tagMap) + if err != nil { + return m3.SeriesFetchResult{}, noop, err + } + + seriesList = append(seriesList, series) + } + + iters, err := buildSeriesIterators(seriesList, opts) + if err != nil { + return m3.SeriesFetchResult{}, noop, err + } + + cleanup := func() error { + iters.Close() + return nil + } + + return m3.SeriesFetchResult{ + SeriesIterators: iters, + Metadata: block.NewResultMetadata(), + }, cleanup, nil +} + +// SearchCompressed fetches matching tags based on a query. +func (q *querier) SearchCompressed( + ctx context.Context, + query *storage.FetchQuery, + options *storage.FetchOptions, +) (m3.TagResult, m3.Cleanup, error) { + return m3.TagResult{}, noop, fmt.Errorf("not impl") +} + +// CompleteTagsCompressed returns autocompleted tag results. +func (q *querier) CompleteTagsCompressed( + ctx context.Context, + query *storage.CompleteTagsQuery, + options *storage.FetchOptions, +) (*storage.CompleteTagsResult, error) { + nameOnly := query.CompleteNameOnly + // TODO: take from config. + return &storage.CompleteTagsResult{ + CompleteNameOnly: nameOnly, + CompletedTags: []storage.CompletedTag{ + storage.CompletedTag{ + Name: []byte("__name__"), + Values: [][]byte{[]byte("foo"), []byte("foo"), []byte("quail")}, + }, + }, + }, nil +} diff --git a/src/cmd/services/m3comparator/main/series_iterator_builder.go b/src/cmd/services/m3comparator/main/series_iterator_builder.go new file mode 100644 index 0000000000..9c47f7835b --- /dev/null +++ b/src/cmd/services/m3comparator/main/series_iterator_builder.go @@ -0,0 +1,166 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "io" + "time" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/x/xio" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" +) + +const sep rune = '!' +const tagSep rune = '.' + +type iteratorOptions struct { + blockSize time.Duration + start time.Time + encoderPool encoding.EncoderPool + iteratorPools encoding.IteratorPools + tagOptions models.TagOptions +} + +var iterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { + return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) +} + +func buildBlockReader( + block seriesBlock, + start time.Time, + opts iteratorOptions, +) ([]xio.BlockReader, error) { + encoder := opts.encoderPool.Get() + encoder.Reset(start, len(block), nil) + for _, dp := range block { + err := encoder.Encode(dp, xtime.Second, nil) + if err != nil { + encoder.Close() + return nil, err + } + } + + segment := encoder.Discard() + return []xio.BlockReader{ + xio.BlockReader{ + SegmentReader: xio.NewSegmentReader(segment), + Start: start, + BlockSize: opts.blockSize, + }, + }, nil +} + +func buildTagIteratorAndID( + tagMap tagMap, + opts models.TagOptions, +) (ident.TagIterator, ident.ID) { + var ( + tags = ident.Tags{} + modelTags = models.NewTags(len(tagMap), opts) + ) + + for name, value := range tagMap { + modelTags = modelTags.AddOrUpdateTag(models.Tag{ + Name: []byte(name), + Value: []byte(value), + }) + + tags.Append(ident.StringTag(name, value)) + } + + id := string(modelTags.ID()) + return ident.NewTagsIterator(tags), ident.StringID(id) +} + +func buildSeriesIterator( + series series, + opts iteratorOptions, +) (encoding.SeriesIterator, error) { + var ( + blocks = series.blocks + tags = series.tags + readers = make([][]xio.BlockReader, 0, len(blocks)) + start = opts.start + ) + + for _, block := range blocks { + seriesBlock, err := buildBlockReader(block, start, opts) + if err != nil { + return nil, err + } + + readers = append(readers, seriesBlock) + start = start.Add(opts.blockSize) + } + + multiReader := encoding.NewMultiReaderIterator( + iterAlloc, + opts.iteratorPools.MultiReaderIterator(), + ) + + sliceOfSlicesIter := xio.NewReaderSliceOfSlicesFromBlockReadersIterator(readers) + multiReader.ResetSliceOfSlices(sliceOfSlicesIter, nil) + + end := opts.start.Add(opts.blockSize) + if len(blocks) > 0 { + lastBlock := blocks[len(blocks)-1] + end = lastBlock[len(lastBlock)-1].Timestamp + } + + tagIter, id := buildTagIteratorAndID(tags, opts.tagOptions) + return encoding.NewSeriesIterator( + encoding.SeriesIteratorOptions{ + ID: id, + Namespace: ident.StringID("ns"), + Tags: tagIter, + StartInclusive: opts.start, + EndExclusive: end, + Replicas: []encoding.MultiReaderIterator{ + multiReader, + }, + }, nil), + nil +} + +func buildSeriesIterators( + series []series, + opts iteratorOptions, +) (encoding.SeriesIterators, error) { + iters := make([]encoding.SeriesIterator, 0, len(series)) + for _, s := range series { + iter, err := buildSeriesIterator(s, opts) + if err != nil { + return nil, err + } + + iters = append(iters, iter) + } + + return encoding.NewSeriesIterators( + iters, + opts.iteratorPools.MutableSeriesIterators(), + ), nil +} diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 0c8cfa103d..59b17d3ba8 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -25,7 +25,11 @@ import ( "fmt" "io" "io/ioutil" + "math" "net/http" + "sort" + "strconv" + "strings" "time" "github.com/m3db/m3/src/query/errors" @@ -45,6 +49,7 @@ const ( filterNameTagsParam = "tag" errFormatStr = "error parsing param: %s, error: %v" maxTimeout = 5 * time.Minute + tolerance = 0.0000001 ) var ( @@ -417,22 +422,213 @@ func RenderSeriesMatchResultsJSON( return jw.Close() } -// PromResp represents Prometheus's query response. -type PromResp struct { +// Response represents Prometheus's query response. +type Response struct { + // Status is the response status. Status string `json:"status"` - Data struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - // todo(braskin): use `Datapoints` instead of interface{} in values - // Values is [float, string] - Values [][]interface{} `json:"values"` - } `json:"result"` - } `json:"data"` + // Data is the response data. + Data data `json:"data"` +} + +type data struct { + // ResultType is the result type for the response. + ResultType string `json:"resultType"` + // Result is the list of results for the response. + Result results `json:"result"` +} + +type results []Result + +// Len is the number of elements in the collection. +func (r results) Len() int { return len(r) } + +// Less reports whether the element with +// index i should sort before the element with index j. +func (r results) Less(i, j int) bool { + return r[i].id < r[j].id +} + +// Swap swaps the elements with indexes i and j. +func (r results) Swap(i, j int) { r[i], r[j] = r[j], r[i] } + +func (r results) sort() { + for _, result := range r { + result.genID() + } + + sort.Sort(r) +} + +// Result is the result itself. +type Result struct { + // Metric is the tags for the result. + Metric Tags `json:"metric"` + // Values is the set of values for the result. + Values Values `json:"values"` + id string +} + +// Tags is a simple representation of Prometheus tags. +type Tags map[string]string + +// Values is a list of values for the Prometheus result. +type Values []Value + +// Value is a single value for Prometheus result. +type Value []interface{} + +func (r *Result) genID() { + var sb strings.Builder + // NB: this may clash but exact tag values are also checked, and this is a + // validation endpoint so there's less concern over correctness. + for k, v := range r.Metric { + sb.WriteString(k) + sb.WriteString(`:"`) + sb.WriteString(v) + sb.WriteString(`",`) + } + + r.id = sb.String() +} + +// MatchInformation describes how well two responses match. +type MatchInformation struct { + // FullMatch indicates a full match. + FullMatch bool + // NoMatch indicates that the responses do not match sufficiently. + NoMatch bool +} + +// Matches compares two responses and determines how closely they match. +func (p Response) Matches(other Response) (MatchInformation, error) { + if p.Status != other.Status { + err := fmt.Errorf("status %s does not match other status %s", + p.Status, other.Status) + return MatchInformation{ + NoMatch: true, + }, err + } + + return p.Data.matches(other.Data) +} + +func (d data) matches(other data) (MatchInformation, error) { + if d.ResultType != other.ResultType { + err := fmt.Errorf("result type %s does not match other result type %s", + d.ResultType, other.ResultType) + return MatchInformation{ + NoMatch: true, + }, err + } + + return d.Result.matches(other.Result) +} + +func (r results) matches(other results) (MatchInformation, error) { + if len(r) != len(other) { + err := fmt.Errorf("result length %d does not match other result length %d", + len(r), len(other)) + return MatchInformation{ + NoMatch: true, + }, err + } + + r.sort() + other.sort() + for i, result := range r { + if err := result.matches(other[i]); err != nil { + return MatchInformation{ + NoMatch: true, + }, err + } + } + + return MatchInformation{FullMatch: true}, nil +} + +func (r Result) matches(other Result) error { + // NB: tags should match by here so this is more of a sanity check. + if err := r.Metric.matches(other.Metric); err != nil { + return err + } + + return r.Values.matches(other.Values) +} + +func (t Tags) matches(other Tags) error { + if len(t) != len(other) { + return fmt.Errorf("tag length %d does not match other tag length %d", + len(t), len(other)) + } + + for k, v := range t { + if vv, ok := other[k]; ok { + if v != vv { + return fmt.Errorf("tag %s does not match other tag length %s", v, vv) + } + } else { + return fmt.Errorf("tag %s not found in other tagset", v) + } + } + + return nil +} + +func (v Values) matches(other Values) error { + if len(v) != len(other) { + return fmt.Errorf("values length %d does not match other values length %d", + len(v), len(other)) + } + + for i, val := range v { + if err := val.matches(other[i]); err != nil { + return err + } + } + + return nil +} + +func (v Value) matches(other Value) error { + if len(v) != 2 { + return fmt.Errorf("value length %d must be 2", len(v)) + } + + if len(other) != 2 { + return fmt.Errorf("other value length %d must be 2", len(other)) + } + + tsV := fmt.Sprint(v[0]) + tsOther := fmt.Sprint(v[0]) + if tsV != tsOther { + return fmt.Errorf("ts %s does not match other ts %s", tsV, tsOther) + } + + valV, err := strconv.ParseFloat(fmt.Sprint(v[1]), 64) + if err != nil { + return err + } + + valOther, err := strconv.ParseFloat(fmt.Sprint(other[1]), 64) + if err != nil { + return err + } + + if math.Abs(valV-valOther) > tolerance { + return fmt.Errorf("point %f does not match other point %f", valV, valOther) + } + + for i, val := range v { + otherVal := other[i] + if val != otherVal { + } + } + + return nil } // PromDebug represents the input and output that are used in the debug endpoint. type PromDebug struct { - Input PromResp `json:"input"` - Results PromResp `json:"results"` + Input Response `json:"input"` + Results Response `json:"results"` } diff --git a/src/query/api/v1/handler/prometheus/native/parse_query.go b/src/query/api/v1/handler/prometheus/native/parse_query.go index e2602c763c..3c84955952 100644 --- a/src/query/api/v1/handler/prometheus/native/parse_query.go +++ b/src/query/api/v1/handler/prometheus/native/parse_query.go @@ -24,6 +24,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/models" @@ -157,7 +158,7 @@ func parseRootNode( return FunctionNode{}, err } - parser, err := promql.Parse(query, models.NewTagOptions()) + parser, err := promql.Parse(query, time.Second, models.NewTagOptions()) if err != nil { logger.Error("cannot parse query PromQL", zap.Error(err)) return FunctionNode{}, err diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index c0eb680f17..994fea64ab 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -184,7 +184,8 @@ func (h *PromReadHandler) ServeHTTPWithEngine( ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) logger := logging.WithContext(ctx, h.instrumentOpts) - params, rErr := parseParams(r, engine.Options(), h.timeoutOps, fetchOpts, h.instrumentOpts) + params, rErr := parseParams(r, engine.Options(), + h.timeoutOps, fetchOpts, h.instrumentOpts) if rErr != nil { h.promReadMetrics.fetchErrorsClient.Inc(1) return nil, emptyReqParams, &RespError{Err: rErr.Inner(), Code: rErr.Code()} @@ -207,7 +208,10 @@ func (h *PromReadHandler) ServeHTTPWithEngine( opentracingext.Error.Set(sp, true) logger.Error("unable to fetch data", zap.Error(err)) h.promReadMetrics.fetchErrorsServer.Inc(1) - return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusInternalServerError} + return nil, emptyReqParams, &RespError{ + Err: err, + Code: http.StatusInternalServerError, + } } // TODO: Support multiple result types diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index 2cb6f28499..b58ab51861 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -72,7 +72,7 @@ func read( emptyResult := readResult{meta: block.NewResultMetadata()} // TODO: Capture timing - parser, err := promql.Parse(params.Query, tagOpts) + parser, err := promql.Parse(params.Query, params.Step, tagOpts) if err != nil { return emptyResult, err } diff --git a/src/query/api/v1/handler/prometheus/validator/handler.go b/src/query/api/v1/handler/prometheus/validator/handler.go index dcc2cda822..0436e6e372 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler.go +++ b/src/query/api/v1/handler/prometheus/validator/handler.go @@ -209,7 +209,11 @@ func validate(prom, m3 map[string]*ts.Series) ([][]mismatch, error) { for _, promdp := range promdps { if math.IsNaN(promdp.Value) && !math.IsNaN(m3dp.Value) { - mismatchList = append(mismatchList, newMismatch(id, promdp.Value, m3dp.Value, promdp.Timestamp, m3dp.Timestamp, nil)) + mismatchList = append( + mismatchList, + newMismatch(id, promdp.Value, m3dp.Value, + promdp.Timestamp, m3dp.Timestamp, nil), + ) continue } @@ -219,13 +223,20 @@ func validate(prom, m3 map[string]*ts.Series) ([][]mismatch, error) { if m3idx > len(m3dps)-1 { err := errors.New("series has extra prom datapoints") - mismatchList = append(mismatchList, newMismatch(id, promdp.Value, math.NaN(), promdp.Timestamp, time.Time{}, err)) + mismatchList = append(mismatchList, + newMismatch(id, promdp.Value, math.NaN(), + promdp.Timestamp, time.Time{}, err), + ) continue } m3dp = m3dps[m3idx] - if (promdp.Value != m3dp.Value && !math.IsNaN(promdp.Value)) || !promdp.Timestamp.Equal(m3dp.Timestamp) { - mismatchList = append(mismatchList, newMismatch(id, promdp.Value, m3dp.Value, promdp.Timestamp, m3dp.Timestamp, nil)) + if (promdp.Value != m3dp.Value && !math.IsNaN(promdp.Value)) || + !promdp.Timestamp.Equal(m3dp.Timestamp) { + mismatchList = append(mismatchList, + newMismatch(id, promdp.Value, m3dp.Value, + promdp.Timestamp, m3dp.Timestamp, nil), + ) } m3idx++ @@ -235,7 +246,8 @@ func validate(prom, m3 map[string]*ts.Series) ([][]mismatch, error) { for _, dp := range m3dps[m3idx:] { if !math.IsNaN(dp.Value) { err := errors.New("series has extra m3 datapoints") - mismatchList = append(mismatchList, newMismatch(id, math.NaN(), dp.Value, time.Time{}, dp.Timestamp, err)) + mismatchList = append(mismatchList, + newMismatch(id, math.NaN(), dp.Value, time.Time{}, dp.Timestamp, err)) } } @@ -247,7 +259,8 @@ func validate(prom, m3 map[string]*ts.Series) ([][]mismatch, error) { return mismatches, nil } -func newMismatch(name string, promVal, m3Val float64, promTime, m3Time time.Time, err error) mismatch { +func newMismatch(name string, promVal, m3Val float64, + promTime, m3Time time.Time, err error) mismatch { return mismatch{ seriesName: name, promVal: promVal, diff --git a/src/query/api/v1/handler/prometheus/validator/handler_test.go b/src/query/api/v1/handler/prometheus/validator/handler_test.go index 0a99794ab7..4c5254dc72 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler_test.go +++ b/src/query/api/v1/handler/prometheus/validator/handler_test.go @@ -22,6 +22,7 @@ package validator import ( "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" @@ -327,10 +328,14 @@ func TestValidateEndpoint(t *testing.T) { server, debugHandler := newServer() defer server.Close() - req, _ := http.NewRequest("POST", PromDebugURL+"?start=1543431465&end=1543435005&step=14&query=go_gc_duration_seconds", newBodyWithMismatch()) + req, _ := http.NewRequest("POST", PromDebugURL+ + "?start=1543434961&end=1543435005&step=14&query=go_gc_duration_seconds", + newBodyWithMismatch()) + recorder := httptest.NewRecorder() debugHandler.ServeHTTP(recorder, req) + fmt.Println(recorder.Body.String()) var mismatches MismatchesJSON require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &mismatches)) assert.False(t, mismatches.Correct) @@ -338,7 +343,9 @@ func TestValidateEndpoint(t *testing.T) { mismatchesList := mismatches.MismatchesList[0] assert.Len(t, mismatchesList.Mismatches, 1) - assert.Equal(t, "__name__=go_gc_duration_seconds,instance=localhost:9090,job=prometheus,quantile=1,", mismatchesList.Mismatches[0].Name) + assert.Equal(t, "__name__=go_gc_duration_seconds,"+ + "instance=localhost:9090,job=prometheus,quantile=1,", + mismatchesList.Mismatches[0].Name) assert.Equal(t, 0.012203, mismatchesList.Mismatches[0].M3Val) } @@ -346,7 +353,10 @@ func TestValidateEndpointWithNumM3dpMismatch(t *testing.T) { server, debugHandler := newServer() defer server.Close() - req, _ := http.NewRequest("POST", PromDebugURL+"?start=1543431465&end=1543435005&step=14&query=go_gc_duration_seconds", newBodyWithNumM3dpMismatch()) + req, _ := http.NewRequest("POST", PromDebugURL+ + "?start=1543431461&end=1543435005&step=14&query=go_gc_duration_seconds", + newBodyWithNumM3dpMismatch()) + recorder := httptest.NewRecorder() debugHandler.ServeHTTP(recorder, req) @@ -365,7 +375,10 @@ func TestValidateEndpointWithNumPromdpMismatch(t *testing.T) { server, debugHandler := newServer() defer server.Close() - req, _ := http.NewRequest("POST", PromDebugURL+"?start=1543431465&end=1543435005&step=14&query=go_gc_duration_seconds", newBodyWithNumPromdpMismatch()) + req, _ := http.NewRequest("POST", PromDebugURL+ + "?start=1543431461&end=1543435005&step=14&query=go_gc_duration_seconds", + newBodyWithNumPromdpMismatch()) + recorder := httptest.NewRecorder() debugHandler.ServeHTTP(recorder, req) diff --git a/src/query/config/m3query-dev-remote.yml b/src/query/config/m3query-dev-remote.yml new file mode 100644 index 0000000000..49972a8340 --- /dev/null +++ b/src/query/config/m3query-dev-remote.yml @@ -0,0 +1,23 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +backend: grpc + +rpc: + remotes: + - name: "remote" + remoteListenAddresses: ["localhost:9000"] + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +tagOptions: + idScheme: quoted diff --git a/src/query/executor/engine_test.go b/src/query/executor/engine_test.go index 6cd492ecde..efc0bccfa5 100644 --- a/src/query/executor/engine_test.go +++ b/src/query/executor/engine_test.go @@ -79,7 +79,7 @@ func TestEngine_ExecuteExpr(t *testing.T) { mockParent := cost.NewMockChainedEnforcer(ctrl) mockParent.EXPECT().Child(gomock.Any()).Return(mockEnforcer) - parser, err := promql.Parse("foo", models.NewTagOptions()) + parser, err := promql.Parse("foo", time.Second, models.NewTagOptions()) require.NoError(t, err) engine := newEngine(mock.NewMockStorage(), defaultLookbackDuration, diff --git a/src/query/executor/state_test.go b/src/query/executor/state_test.go index 734c04f978..9d42601d05 100644 --- a/src/query/executor/state_test.go +++ b/src/query/executor/state_test.go @@ -45,6 +45,7 @@ func testRequestParams() models.RequestParams { return models.RequestParams{ Now: time.Now(), LookbackDuration: defaultLookbackDuration, + Step: time.Second, } } diff --git a/src/query/executor/transform/types.go b/src/query/executor/transform/types.go index 0e56e1aaa5..7e0830688f 100644 --- a/src/query/executor/transform/types.go +++ b/src/query/executor/transform/types.go @@ -45,7 +45,7 @@ type Options struct { instrumentOptions instrument.Options } -// OptionsParams are the params used to create Options. +// OptionsParams are the parameters used to create Options. type OptionsParams struct { FetchOptions *storage.FetchOptions TimeSpec TimeSpec @@ -96,21 +96,29 @@ func (o Options) InstrumentOptions() instrument.Options { return o.instrumentOptions } -// OpNode represents the execution node +// OpNode represents an execution node. type OpNode interface { - Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error + Process( + queryCtx *models.QueryContext, + ID parser.NodeID, + block block.Block, + ) error } -// TimeSpec defines the time bounds for the query execution. End is exclusive +// TimeSpec defines the time bounds for the query execution. Start is inclusive +// and End is exclusive. type TimeSpec struct { + // Start is the inclusive start bound for the query. Start time.Time - End time.Time - // Now captures the current time and fixes it throughout the request, we may let people override it in the future - Now time.Time + // End is the exclusive end bound for the query. + End time.Time + // Now captures the current time and fixes it throughout the request. + Now time.Time + // Step is the step size for the query. Step time.Duration } -// Bounds transforms a timespec to bounds +// Bounds transforms the timespec to bounds. func (ts TimeSpec) Bounds() models.Bounds { return models.Bounds{ Start: ts.Start, @@ -119,27 +127,32 @@ func (ts TimeSpec) Bounds() models.Bounds { } } -// Params are defined by transforms +// Params are defined by transforms. type Params interface { parser.Params Node(controller *Controller, opts Options) OpNode } -// MetaNode is implemented by function nodes which can alter metadata for a block +// MetaNode is implemented by function nodes which +// can alter metadata for a block. type MetaNode interface { - // Meta provides the block metadata for the block using the input blocks' metadata as input + // Meta provides the block metadata for the block using the + // input blocks' metadata as input. Meta(meta block.Metadata) block.Metadata - // SeriesMeta provides the series metadata for the block using the previous blocks' series metadata as input + // SeriesMeta provides the series metadata for the block using the + // previous blocks' series metadata as input. SeriesMeta(metas []block.SeriesMeta) []block.SeriesMeta } -// BoundOp is implements by operations which have bounds +// BoundOp is an operation that is able to yield boundary information. type BoundOp interface { Bounds() BoundSpec } -// BoundSpec is the bound spec for an operation +// BoundSpec is the boundary specification for an operation. type BoundSpec struct { - Range time.Duration + // Range is the time range for the operation. + Range time.Duration + // Offset is the offset for the operation. Offset time.Duration } diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index 73d013d71f..7e94ac039e 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -48,7 +48,7 @@ type FetchOp struct { Matchers models.Matchers } -// FetchNode is the execution node +// FetchNode is a fetch execution node. // TODO: Make FetchNode private type FetchNode struct { debug bool @@ -61,12 +61,12 @@ type FetchNode struct { instrumentOpts instrument.Options } -// OpType for the operator +// OpType for the operator. func (o FetchOp) OpType() string { return FetchType } -// Bounds returns the bounds for the spec +// Bounds returns the bounds for this operation. func (o FetchOp) Bounds() transform.BoundSpec { return transform.BoundSpec{ Range: o.Range, @@ -74,13 +74,18 @@ func (o FetchOp) Bounds() transform.BoundSpec { } } -// String representation +// String is the string representation for this operation. func (o FetchOp) String() string { - return fmt.Sprintf("type: %s. name: %s, range: %v, offset: %v, matchers: %v", o.OpType(), o.Name, o.Range, o.Offset, o.Matchers) + return fmt.Sprintf("type: %s. name: %s, range: %v, offset: %v, matchers: %v", + o.OpType(), o.Name, o.Range, o.Offset, o.Matchers) } -// Node creates an execution node -func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage, options transform.Options) parser.Source { +// Node creates the fetch execution node for this operation. +func (o FetchOp) Node( + controller *transform.Controller, + storage storage.Storage, + options transform.Options, +) parser.Source { return &FetchNode{ op: o, controller: controller, @@ -99,7 +104,8 @@ func (n *FetchNode) fetch(queryCtx *models.QueryContext) (block.Result, error) { defer sp.Finish() timeSpec := n.timespec - // No need to adjust start and ends since physical plan already considers the offset, range + // No need to adjust start and ends since physical plan + // already considers the offset, range startTime := timeSpec.Start endTime := timeSpec.End diff --git a/src/query/functions/linear/clamp.go b/src/query/functions/linear/clamp.go index b300c036ae..a05debc6da 100644 --- a/src/query/functions/linear/clamp.go +++ b/src/query/functions/linear/clamp.go @@ -65,6 +65,14 @@ func clampFn(max bool, roundTo float64) block.ValueTransform { return func(v float64) float64 { return math.Max(v, roundTo) } } +func removeName(meta []block.SeriesMeta) []block.SeriesMeta { + for i, m := range meta { + meta[i].Tags = m.Tags.WithoutName() + } + + return meta +} + // NewClampOp creates a new clamp op based on the type and arguments func NewClampOp(args []interface{}, opType string) (parser.Params, error) { isMax := opType == ClampMaxType @@ -78,6 +86,8 @@ func NewClampOp(args []interface{}, opType string) (parser.Params, error) { } fn := clampFn(isMax, clampTo) - lazyOpts := block.NewLazyOptions().SetValueTransform(fn) + lazyOpts := block.NewLazyOptions(). + SetValueTransform(fn). + SetSeriesMetaTransform(removeName) return lazy.NewLazyOp(opType, lazyOpts) } diff --git a/src/query/functions/temporal/aggregation.go b/src/query/functions/temporal/aggregation.go index 6ad47844fc..0b5ed2ec3b 100644 --- a/src/query/functions/temporal/aggregation.go +++ b/src/query/functions/temporal/aggregation.go @@ -141,7 +141,7 @@ type aggNode struct { aggFunc func([]float64) float64 } -func (a *aggNode) process(datapoints ts.Datapoints, _ time.Time) float64 { +func (a *aggNode) process(datapoints ts.Datapoints, _ iterationBounds) float64 { return a.aggFunc(datapoints.Values()) } diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index a2723aff8b..0e03b3bb0a 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -314,12 +314,12 @@ func (c *baseNode) processCompletedBlocks( // than direct index accesses, as that may cause panics when reaching the end of // the datapoint list. func getIndices( - dp []ts.Datapoint, + dps []ts.Datapoint, lBound time.Time, rBound time.Time, init int, ) (int, int, bool) { - if init >= len(dp) || init < 0 { + if init >= len(dps) || init < 0 { return -1, -1, false } @@ -328,7 +328,7 @@ func getIndices( leftBound = false ) - for i, dp := range dp[init:] { + for i, dp := range dps[init:] { ts := dp.Timestamp if !leftBound { // Trying to set left bound. @@ -341,7 +341,7 @@ func getIndices( l = i } - if ts.Before(rBound) { + if !ts.After(rBound) { continue } @@ -350,13 +350,16 @@ func getIndices( } if r == -1 { - r = len(dp) + r = len(dps) } else { r = r + init } if leftBound { l = l + init + } else { + // if left bound was not found, there are no valid candidate points here. + return l, r, false } return l, r, true @@ -386,6 +389,11 @@ func buildValueBuffer( return make(ts.Datapoints, 0, l) } +type iterationBounds struct { + start time.Time + end time.Time +} + func (c *baseNode) processSingleRequest( request processRequest, valueBuffer ts.Datapoints, @@ -396,9 +404,10 @@ func (c *baseNode) processSingleRequest( } var ( - meta = request.blk.Meta() - bounds = meta.Bounds - seriesMeta = seriesIter.SeriesMeta() + aggDuration = c.op.duration + meta = request.blk.Meta() + seriesMeta = seriesIter.SeriesMeta() + bounds = meta.Bounds ) // rename series to exclude their __name__ tag as part of function processing. @@ -421,7 +430,6 @@ func (c *baseNode) processSingleRequest( return nil, err } - aggDuration := c.op.duration depIters := make([]block.UnconsolidatedSeriesIter, 0, len(request.deps)) for _, b := range request.deps { iter, err := b.SeriesIter() @@ -458,21 +466,29 @@ func (c *baseNode) processSingleRequest( } var ( - newVal float64 - init = 0 - alignedTime = bounds.Start - start = alignedTime.Add(-1 * aggDuration) + newVal float64 + init = 0 + end = bounds.Start + start = end.Add(-1 * aggDuration) ) for i := 0; i < series.Len(); i++ { val := series.DatapointsAtStep(i) valueBuffer = append(valueBuffer, val...) - l, r, b := getIndices(valueBuffer, start, alignedTime, init) + } + + for i := 0; i < series.Len(); i++ { + iterBounds := iterationBounds{ + start: start, + end: end, + } + + l, r, b := getIndices(valueBuffer, start, end, init) if !b { - newVal = c.processor.process(ts.Datapoints{}, alignedTime) + newVal = c.processor.process(ts.Datapoints{}, iterBounds) } else { init = l - newVal = c.processor.process(valueBuffer[l:r], alignedTime) + newVal = c.processor.process(valueBuffer[l:r], iterBounds) } if err := builder.AppendValue(i, newVal); err != nil { @@ -480,7 +496,7 @@ func (c *baseNode) processSingleRequest( } start = start.Add(bounds.StepSize) - alignedTime = alignedTime.Add(bounds.StepSize) + end = end.Add(bounds.StepSize) } } @@ -520,7 +536,7 @@ func (c *baseNode) sweep(processedKeys []bool, maxBlocks int) { // processor is implemented by the underlying transforms. type processor interface { - process(valueBuffer ts.Datapoints, evaluationTime time.Time) float64 + process(valueBuffer ts.Datapoints, evaluationTime iterationBounds) float64 } // makeProcessor is a way to create a transform. diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index 9caa508153..345afdb106 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -49,7 +49,7 @@ func (p noopProcessor) initialize( return &p } -func (p *noopProcessor) process(dps ts.Datapoints, _ time.Time) float64 { +func (p *noopProcessor) process(dps ts.Datapoints, _ iterationBounds) float64 { vals := dps.Values() sum := 0.0 for _, n := range vals { diff --git a/src/query/functions/temporal/functions.go b/src/query/functions/temporal/functions.go index 2f5a8309f8..265020fae9 100644 --- a/src/query/functions/temporal/functions.go +++ b/src/query/functions/temporal/functions.go @@ -87,7 +87,7 @@ type functionNode struct { comparisonFunc comparisonFunc } -func (f *functionNode) process(datapoints ts.Datapoints, _ time.Time) float64 { +func (f *functionNode) process(datapoints ts.Datapoints, _ iterationBounds) float64 { if len(datapoints) == 0 { return math.NaN() } diff --git a/src/query/functions/temporal/linear_regression.go b/src/query/functions/temporal/linear_regression.go index 13d0230ab0..0b3bdccecd 100644 --- a/src/query/functions/temporal/linear_regression.go +++ b/src/query/functions/temporal/linear_regression.go @@ -128,12 +128,13 @@ type linearRegressionNode struct { func (l linearRegressionNode) process( dps ts.Datapoints, - evaluationTime time.Time, + iterBounds iterationBounds, ) float64 { if dps.Len() < 2 { return math.NaN() } + evaluationTime := iterBounds.end slope, intercept := linearRegression(dps, evaluationTime, l.isDeriv) return l.fn(slope, intercept) } diff --git a/src/query/functions/temporal/rate.go b/src/query/functions/temporal/rate.go index d99807c00c..312b33baa3 100644 --- a/src/query/functions/temporal/rate.go +++ b/src/query/functions/temporal/rate.go @@ -119,8 +119,11 @@ type rateNode struct { rateFn rateFn } -func (r *rateNode) process(datapoints ts.Datapoints, _ time.Time) float64 { - return r.rateFn(datapoints, r.isRate, r.isCounter, r.timeSpec, r.duration) +func (r *rateNode) process(datapoints ts.Datapoints, bounds iterationBounds) float64 { + ts := r.timeSpec + ts.Start = bounds.start + ts.End = bounds.end + return r.rateFn(datapoints, r.isRate, r.isCounter, ts, r.duration) } func standardRateFunc( @@ -140,6 +143,8 @@ func standardRateFunc( foundFirst bool ) + rangeStart := timeSpec.Start + rangeEnd := timeSpec.End for i, dp := range datapoints { if math.IsNaN(dp.Value) { continue @@ -166,13 +171,8 @@ func standardRateFunc( } resultValue := lastValue - firstVal + counterCorrection - - rangeStart := timeSpec.Start.Add(-1 * (timeSpec.Step + timeWindow)) durationToStart := firstTS.Sub(rangeStart).Seconds() - - rangeEnd := timeSpec.End.Add(-1 * timeSpec.Step) durationToEnd := rangeEnd.Sub(lastTS).Seconds() - sampledInterval := lastTS.Sub(firstTS).Seconds() averageDurationBetweenSamples := sampledInterval / float64(lastIdx-firstIdx) @@ -217,7 +217,13 @@ func standardRateFunc( return resultValue } -func irateFunc(datapoints ts.Datapoints, isRate bool, _ bool, timeSpec transform.TimeSpec, _ time.Duration) float64 { +func irateFunc( + datapoints ts.Datapoints, + isRate bool, + _ bool, + timeSpec transform.TimeSpec, + _ time.Duration, +) float64 { dpsLen := len(datapoints) if dpsLen < 2 { return math.NaN() @@ -259,8 +265,8 @@ func irateFunc(datapoints ts.Datapoints, isRate bool, _ bool, timeSpec transform return resultValue } -// findNonNanIdx iterates over the values backwards until we find a non-NaN value, -// then returns its index +// findNonNanIdx iterates over the values backwards until we find a non-NaN +// value, then returns its index. func findNonNanIdx(dps ts.Datapoints, startingIdx int) int { for i := startingIdx; i >= 0; i-- { if !math.IsNaN(dps[i].Value) { diff --git a/src/query/functions/temporal/rate_test.go b/src/query/functions/temporal/rate_test.go index e7a28f1157..dd97117e4f 100644 --- a/src/query/functions/temporal/rate_test.go +++ b/src/query/functions/temporal/rate_test.go @@ -101,12 +101,12 @@ var testRateCases = []testRateCase{ {1987036, 1988988, 1990940, 1992892, 1994844}, }, afterBlockOne: [][]float64{ - {math.NaN(), math.NaN(), 0, 0, 1165.0844}, - {math.NaN(), 13.01333, 19.52, 26.0266, 32.5333}, + {math.NaN(), math.NaN(), 0, 0, 1019.4488937434074}, + {math.NaN(), 9.760000, 16.26666, 22.77333, 32.533333}, }, afterAllBlocks: [][]float64{ {255709.8666, 259191.4666, 259191.4666, 258099.2, 4573.8666}, - {8303.7166, 8303.7166, 8303.7166, 8303.7166, 32.5333}, + {8303.7166, 8303.7166, 8303.7166, 8303.7166, 32.53333}, }, }, { @@ -117,12 +117,12 @@ var testRateCases = []testRateCase{ {1987036, 1988988, 1990940, math.NaN(), 1994844}, }, afterBlockOne: [][]float64{ - {math.NaN(), math.NaN(), 0, 1310.72, 1310.72}, - {math.NaN(), 13.01333, 19.52, 19.52, 32.5333}, + {math.NaN(), math.NaN(), 0, 1092.2666739484446, 1529.173340615111}, + {math.NaN(), 9.760000, 16.26666, 22.77333, 32.533333}, }, afterAllBlocks: [][]float64{ - {255709.8666, 259191.4666, 258099.2, 4878.7911, 4878.7911}, - {8303.7166, 8303.7166, 8848.6222, 8848.6222, 32.5333}, + {255709.8666, 259191.4666, 258099.2, 4268.9422, 6098.48888}, + {8303.7166, 8303.7166, 7742.5444, 11060.7777, 32.5333}, }, }, { @@ -204,8 +204,8 @@ var testDeltaCases = []testRateCase{ {2299, 2299, 2299, 2787, 2787}, }, afterBlockOne: [][]float64{ - {math.NaN(), math.NaN(), 4456, 6684, 8912}, - {math.NaN(), 0, 0, 650.6666, 610}, + {math.NaN(), math.NaN(), 3342.000037, 5570.000037, 7798.0000371}, + {math.NaN(), 0, 0, 569.33333, 610}, }, afterAllBlocks: [][]float64{ {-2785, -2785, -2785, -2785, 11140}, @@ -220,12 +220,12 @@ var testDeltaCases = []testRateCase{ {2299, 2299, 2299, math.NaN(), 2787}, }, afterBlockOne: [][]float64{ - {math.NaN(), math.NaN(), 4456, 6684, 6684}, + {math.NaN(), math.NaN(), 3342.000037, 5570.000037, 7798.0000371}, {math.NaN(), 0, 0, 0, 610}, }, afterAllBlocks: [][]float64{ - {-2785, -2785, -2785, 8912, 8912}, - {0, 0, -650.6666, -650.6666, 610}, + {-2785, -2785, -2785, 7798.000037, 11140}, + {0, 0, -569.33333, -813.33333, 610}, }, }, { @@ -259,8 +259,8 @@ var testIncreaseCases = []testRateCase{ {1987036, 1988988, 1990940, 1992892, 1994844}, }, afterBlockOne: [][]float64{ - {math.NaN(), math.NaN(), 4456, 6684, 8912}, - {math.NaN(), 3904, 5856, 7808, 9760}, + {math.NaN(), math.NaN(), 3342, 5570, 7798}, + {math.NaN(), 2928, 4880, 6832, 9760}, }, afterAllBlocks: [][]float64{ {8355, 1087957.5, 1087957.5, 1087957.5, 1090742.5}, @@ -275,12 +275,12 @@ var testIncreaseCases = []testRateCase{ {1987036, 1988988, 1990940, math.NaN(), 1994844}, }, afterBlockOne: [][]float64{ - {math.NaN(), math.NaN(), 4456, 10176, 10176}, - {math.NaN(), 3904, 5856, 5856, 9760}, + {math.NaN(), math.NaN(), 3342.000037, 8480, 11872}, + {math.NaN(), 2928, 4880, 6832, 9760}, }, afterAllBlocks: [][]float64{ - {1099222.5, 2178825, 2175915, 1163592, 1163592}, - {2491115, 2491115, 2654586.6666, 2654586.6666, 9760}, + {1099222.5, 2178825, 2175915, 1018143.00484, 1454490}, + {2491115, 2491115, 2322763.34439, 3318233.3333, 9760}, }, }, { diff --git a/src/query/graphite/ts/series_test.go b/src/query/graphite/ts/series_test.go index 431f49d08b..1ea500ee4c 100644 --- a/src/query/graphite/ts/series_test.go +++ b/src/query/graphite/ts/series_test.go @@ -154,9 +154,10 @@ func TestConsolidation(t *testing.T) { {Timestamp: startTime.Add(74 * time.Second), Value: 20.5}, } - consolidation := NewConsolidation(ctx, startTime, endTime, 10*1000, func(a, b float64, count int) float64 { - return a + b - }) + consolidation := NewConsolidation( + ctx, startTime, endTime, 10*1000, func(a, b float64, count int) float64 { + return a + b + }) for i := range datapoints { consolidation.AddDatapoint(datapoints[i].Timestamp, datapoints[i].Value) diff --git a/src/query/models/bounds.go b/src/query/models/bounds.go index 8d08eb08a7..12be51b757 100644 --- a/src/query/models/bounds.go +++ b/src/query/models/bounds.go @@ -22,6 +22,7 @@ package models import ( "fmt" + "math" "time" ) @@ -118,9 +119,10 @@ func (b Bounds) Nearest(t time.Time) Bounds { } } - for startTime.After(t) { - endTime = startTime - startTime = startTime.Add(-1 * duration) + if startTime.After(t) { + diff := startTime.Sub(t) + timeDiff := math.Ceil(float64(diff) / float64(step)) + startTime = startTime.Add(-1 * time.Duration(timeDiff) * step) } return Bounds{ diff --git a/src/query/parser/promql/parse.go b/src/query/parser/promql/parse.go index 6bc9f9e28a..283a6f88d4 100644 --- a/src/query/parser/promql/parse.go +++ b/src/query/parser/promql/parse.go @@ -35,25 +35,34 @@ import ( ) type promParser struct { - expr pql.Expr - tagOpts models.TagOptions + stepSize time.Duration + expr pql.Expr + tagOpts models.TagOptions } // Parse takes a promQL string and converts parses it into a DAG. -func Parse(q string, tagOpts models.TagOptions) (parser.Parser, error) { +func Parse( + q string, + stepSize time.Duration, + tagOpts models.TagOptions, +) (parser.Parser, error) { expr, err := pql.ParseExpr(q) if err != nil { return nil, err } return &promParser{ - expr: expr, - tagOpts: tagOpts, + expr: expr, + stepSize: stepSize, + tagOpts: tagOpts, }, nil } func (p *promParser) DAG() (parser.Nodes, parser.Edges, error) { - state := &parseState{tagOpts: p.tagOpts} + state := &parseState{ + stepSize: p.stepSize, + tagOpts: p.tagOpts, + } err := state.walk(p.expr) if err != nil { return nil, nil, err @@ -67,6 +76,7 @@ func (p *promParser) String() string { } type parseState struct { + stepSize time.Duration edges parser.Edges transforms parser.Nodes tagOpts models.TagOptions @@ -84,11 +94,6 @@ func (p *parseState) transformLen() int { return len(p.transforms) } -func validOffset(offset time.Duration) error { - - return nil -} - func (p *parseState) addLazyUnaryTransform(unaryOp string) error { // NB: if unary type is "+", we do not apply any offsets. if unaryOp == binary.PlusType { @@ -148,6 +153,18 @@ func (p *parseState) addLazyOffsetTransform(offset time.Duration) error { return nil } +func adjustOffset(offset time.Duration, step time.Duration) time.Duration { + // handles case where offset is 0 too. + align := offset % step + if align == 0 { + return offset + } + + // NB: Prometheus rounds offsets up to step size, e.g. a 61 second offset with + // a 1 minute stepsize gets rounded to a 2 minute offset. + return offset + step - align +} + func (p *parseState) walk(node pql.Node) error { if node == nil { return nil @@ -175,6 +192,8 @@ func (p *parseState) walk(node pql.Node) error { return nil case *pql.MatrixSelector: + // Align offset to stepSize. + n.Offset = adjustOffset(n.Offset, p.stepSize) operation, err := NewSelectorFromMatrix(n, p.tagOpts) if err != nil { return err @@ -187,6 +206,8 @@ func (p *parseState) walk(node pql.Node) error { return p.addLazyOffsetTransform(n.Offset) case *pql.VectorSelector: + // Align offset to stepSize. + n.Offset = adjustOffset(n.Offset, p.stepSize) operation, err := NewSelectorFromVector(n, p.tagOpts) if err != nil { return err diff --git a/src/query/parser/promql/parse_test.go b/src/query/parser/promql/parse_test.go index def4fe6f41..1b6e718995 100644 --- a/src/query/parser/promql/parse_test.go +++ b/src/query/parser/promql/parse_test.go @@ -22,6 +22,7 @@ package promql import ( "testing" + "time" "github.com/m3db/m3/src/query/functions" "github.com/m3db/m3/src/query/functions/aggregation" @@ -41,7 +42,7 @@ import ( func TestDAGWithCountOp(t *testing.T) { q := "count(http_requests_total{method=\"GET\"}) by (service)" - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -59,7 +60,7 @@ func TestDAGWithCountOp(t *testing.T) { func TestDAGWithOffset(t *testing.T) { q := "up offset 2m" - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -77,13 +78,13 @@ func TestDAGWithOffset(t *testing.T) { func TestInvalidOffset(t *testing.T) { q := "up offset -2m" - _, err := Parse(q, models.NewTagOptions()) + _, err := Parse(q, time.Second, models.NewTagOptions()) require.Error(t, err) } func TestNegativeUnary(t *testing.T) { q := "-up" - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -99,7 +100,7 @@ func TestNegativeUnary(t *testing.T) { func TestPositiveUnary(t *testing.T) { q := "+up" - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -111,7 +112,7 @@ func TestPositiveUnary(t *testing.T) { func TestInvalidUnary(t *testing.T) { q := "*up" - _, err := Parse(q, models.NewTagOptions()) + _, err := Parse(q, time.Second, models.NewTagOptions()) require.Error(t, err) } @@ -126,13 +127,13 @@ func TestGetUnaryOpType(t *testing.T) { func TestDAGWithEmptyExpression(t *testing.T) { q := "" - _, err := Parse(q, models.NewTagOptions()) + _, err := Parse(q, time.Second, models.NewTagOptions()) require.Error(t, err) } func TestDAGWithFakeOp(t *testing.T) { q := "fake(http_requests_total{method=\"GET\"})" - _, err := Parse(q, models.NewTagOptions()) + _, err := Parse(q, time.Second, models.NewTagOptions()) require.Error(t, err) } @@ -160,7 +161,7 @@ func TestAggregateParses(t *testing.T) { for _, tt := range aggregateParseTests { t.Run(tt.q, func(t *testing.T) { q := tt.q - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -210,7 +211,7 @@ func TestLinearParses(t *testing.T) { for _, tt := range linearParseTests { t.Run(tt.q, func(t *testing.T) { q := tt.q - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -245,7 +246,7 @@ func TestVariadicParses(t *testing.T) { for _, tt := range variadicTests { t.Run(tt.q, func(t *testing.T) { q := tt.q - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, _, err := p.DAG() require.NoError(t, err) @@ -268,7 +269,7 @@ func TestSort(t *testing.T) { for _, tt := range sortTests { t.Run(tt.q, func(t *testing.T) { q := tt.q - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -281,7 +282,7 @@ func TestSort(t *testing.T) { } func TestScalar(t *testing.T) { - p, err := Parse("scalar(up)", models.NewTagOptions()) + p, err := Parse("scalar(up)", time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -300,7 +301,7 @@ func TestVector(t *testing.T) { for _, expr := range vectorExprs { t.Run(expr, func(t *testing.T) { - p, err := Parse(expr, models.NewTagOptions()) + p, err := Parse(expr, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -314,7 +315,7 @@ func TestVector(t *testing.T) { func TestTimeTypeParse(t *testing.T) { q := "time()" - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -354,7 +355,7 @@ var binaryParseTests = []struct { func TestBinaryParses(t *testing.T) { for _, tt := range binaryParseTests { t.Run(tt.q, func(t *testing.T) { - p, err := Parse(tt.q, models.NewTagOptions()) + p, err := Parse(tt.q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() @@ -376,7 +377,7 @@ func TestBinaryParses(t *testing.T) { } func TestParenPrecedenceParses(t *testing.T) { - p, err := Parse("(5^(up-6))", models.NewTagOptions()) + p, err := Parse("(5^(up-6))", time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -440,7 +441,7 @@ func TestTemporalParses(t *testing.T) { for _, tt := range temporalParseTests { t.Run(tt.q, func(t *testing.T) { q := tt.q - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -468,7 +469,7 @@ func TestTagParses(t *testing.T) { for _, tt := range tagParseTests { t.Run(tt.q, func(t *testing.T) { q := tt.q - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) @@ -486,13 +487,13 @@ func TestTagParses(t *testing.T) { func TestFailedTemporalParse(t *testing.T) { q := "unknown_over_time(http_requests_total[5m])" - _, err := Parse(q, models.NewTagOptions()) + _, err := Parse(q, time.Second, models.NewTagOptions()) require.Error(t, err) } func TestMissingTagsDoNotPanic(t *testing.T) { q := `label_join(up, "foo", ",")` - p, err := Parse(q, models.NewTagOptions()) + p, err := Parse(q, time.Second, models.NewTagOptions()) require.NoError(t, err) assert.NotPanics(t, func() { _, _, _ = p.DAG() }) } diff --git a/src/query/parser/promql/resolve_scalar_test.go b/src/query/parser/promql/resolve_scalar_test.go index f27783f4a7..faef41dcc6 100644 --- a/src/query/parser/promql/resolve_scalar_test.go +++ b/src/query/parser/promql/resolve_scalar_test.go @@ -23,6 +23,7 @@ package promql import ( "math" "testing" + "time" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test" @@ -73,7 +74,7 @@ var scalarResolverTests = []struct { func TestScalarResolver(t *testing.T) { for _, tt := range scalarResolverTests { t.Run(tt.funcString, func(t *testing.T) { - parsed, err := Parse(tt.funcString, models.NewTagOptions()) + parsed, err := Parse(tt.funcString, time.Second, models.NewTagOptions()) require.NoError(t, err) expr := parsed.(*promParser).expr actual, err := resolveScalarArgument(expr) diff --git a/src/query/plan/physical.go b/src/query/plan/physical.go index ce9603ad64..325d4435b1 100644 --- a/src/query/plan/physical.go +++ b/src/query/plan/physical.go @@ -29,7 +29,7 @@ import ( "github.com/m3db/m3/src/query/parser" ) -// PhysicalPlan represents the physical plan +// PhysicalPlan represents the physical plan. type PhysicalPlan struct { steps map[parser.NodeID]LogicalStep pipeline []parser.NodeID // Ordered list of steps to be performed @@ -40,16 +40,26 @@ type PhysicalPlan struct { LookbackDuration time.Duration } -// ResultOp is resonsible for delivering results to the clients +// ResultOp is responsible for delivering results to the clients. type ResultOp struct { Parent parser.NodeID } -// NewPhysicalPlan is used to generate a physical plan. Its responsibilities include creating consolidation nodes, result nodes, -// pushing down predicates, changing the ordering for nodes +// NewPhysicalPlan is used to generate a physical plan. +// Its responsibilities include creating consolidation nodes, result nodes, +// pushing down predicates, and changing the ordering for nodes. // nolint: unparam -func NewPhysicalPlan(lp LogicalPlan, params models.RequestParams) (PhysicalPlan, error) { - // generate a new physical plan after cloning the logical plan so that any changes here do not update the logical plan +func NewPhysicalPlan( + lp LogicalPlan, + params models.RequestParams, +) (PhysicalPlan, error) { + if params.Step <= 0 { + return PhysicalPlan{}, fmt.Errorf("expected non-zero step size, got %d", + params.Step) + } + + // generate a new physical plan after cloning the logical plan so that any + // changes here do not update the logical plan. cloned := lp.Clone() p := PhysicalPlan{ steps: cloned.Steps, @@ -97,8 +107,15 @@ func (p PhysicalPlan) shiftTime() PhysicalPlan { } startShift := maxOffset + maxRange - // keeping end the same for now, might optimize later - p.TimeSpec.Start = p.TimeSpec.Start.Add(-1 * startShift) + shift := startShift % p.TimeSpec.Step + extraStep := p.TimeSpec.Step + if shift == 0 { + // NB: if the start is divisible by offset, no need to take an extra step. + extraStep = 0 + } + + alignedShift := startShift - extraStep - shift + p.TimeSpec.Start = p.TimeSpec.Start.Add(-1 * alignedShift) return p } @@ -134,14 +151,15 @@ func (p PhysicalPlan) leafNode() (LogicalStep, error) { return leaf, nil } -// Step gets the logical step using its unique ID in the DAG +// Step gets the logical step using its unique ID in the DAG. func (p PhysicalPlan) Step(ID parser.NodeID) (LogicalStep, bool) { // Editor complains when inlining the map get step, ok := p.steps[ID] return step, ok } -// String representation of the physical plan +// String representation of the physical plan. func (p PhysicalPlan) String() string { - return fmt.Sprintf("StepCount: %s, Pipeline: %s, Result: %s, TimeSpec: %v", p.steps, p.pipeline, p.ResultStep, p.TimeSpec) + return fmt.Sprintf("StepCount: %s, Pipeline: %s, Result: %s, TimeSpec: %v", + p.steps, p.pipeline, p.ResultStep, p.TimeSpec) } diff --git a/src/query/plan/physical_test.go b/src/query/plan/physical_test.go index ff0aecfd3d..cfe5a72f6c 100644 --- a/src/query/plan/physical_test.go +++ b/src/query/plan/physical_test.go @@ -42,6 +42,7 @@ func testRequestParams() models.RequestParams { return models.RequestParams{ Now: time.Now(), LookbackDuration: defaultLookbackDuration, + Step: time.Second, } } @@ -88,13 +89,15 @@ func TestShiftTime(t *testing.T) { p, err := NewPhysicalPlan(lp, params) require.NoError(t, err) - assert.Equal(t, params.Start.Add(-1*params.LookbackDuration), p.TimeSpec.Start, - fmt.Sprintf("start is not now - lookback")) - fetchTransform = parser.NewTransformFromOperation(functions.FetchOp{Offset: time.Minute, Range: time.Hour}, 1) + assert.Equal(t, params.Start.Add(-1*params.LookbackDuration), + p.TimeSpec.Start, fmt.Sprintf("start is not now - lookback")) + fetchTransform = parser.NewTransformFromOperation( + functions.FetchOp{Offset: time.Minute, Range: time.Hour}, 1) transforms = parser.Nodes{fetchTransform, countTransform} lp, _ = NewLogicalPlan(transforms, edges) p, err = NewPhysicalPlan(lp, params) require.NoError(t, err) - assert.Equal(t, params.Start.Add(-1*(time.Minute+time.Hour+defaultLookbackDuration)), p.TimeSpec.Start, + assert.Equal(t, params.Start. + Add(-1*(time.Minute+time.Hour+defaultLookbackDuration)), p.TimeSpec.Start, "start time offset by fetch") } diff --git a/src/query/storage/config.go b/src/query/storage/config.go index 0e82f946a0..f16c83f7a2 100644 --- a/src/query/storage/config.go +++ b/src/query/storage/config.go @@ -64,14 +64,14 @@ func ValidateMetricsType(v MetricsType) error { } // UnmarshalYAML unmarshals a stored merics type. -func (v *MetricsType) UnmarshalYAML(unmarshal func(interface{}) error) error { +func (t *MetricsType) UnmarshalYAML(unmarshal func(interface{}) error) error { var str string if err := unmarshal(&str); err != nil { return err } if value, err := ParseMetricsType(str); err == nil { - *v = value + *t = value return nil } diff --git a/src/query/storage/consolidated_test.go b/src/query/storage/consolidated_test.go index ca530d3e91..2d5a339641 100644 --- a/src/query/storage/consolidated_test.go +++ b/src/query/storage/consolidated_test.go @@ -118,14 +118,14 @@ var consolidationTests = []struct { {3, nan, nan}, {5, nan, 200}, {7, nan, nan}, - {nan, 30, nan}, + {7, 30, nan}, {nan, nan, 300}, {8, nan, nan}, - {nan, nan, nan}, + {nan, 40, nan}, {9, nan, 400}, + {9, nan, 500}, {nan, nan, 500}, {nan, nan, nan}, - {nan, nan, nan}, }, }, { @@ -134,14 +134,14 @@ var consolidationTests = []struct { expected: [][]float64{ {nan, nan, nan}, {nan, 20, 100}, - {1, nan, nan}, + {1, 20, 100}, {4, nan, 200}, - {7, nan, nan}, - {nan, nan, 300}, - {8, nan, nan}, - {nan, nan, 400}, + {7, nan, 200}, + {7, 30, 300}, + {8, nan, 300}, + {nan, 40, 400}, + {9, nan, 500}, {nan, nan, 500}, - {nan, nan, nan}, }, }, } diff --git a/src/query/storage/error_behavior.go b/src/query/storage/error_behavior.go index c99962fdad..f62e180207 100644 --- a/src/query/storage/error_behavior.go +++ b/src/query/storage/error_behavior.go @@ -33,8 +33,8 @@ var ( } ) -func (t ErrorBehavior) String() string { - switch t { +func (e ErrorBehavior) String() string { + switch e { case BehaviorFail: return "fail" case BehaviorWarn: @@ -58,14 +58,14 @@ func ParseErrorBehavior(str string) (ErrorBehavior, error) { } // UnmarshalYAML unmarshals an error behavior. -func (v *ErrorBehavior) UnmarshalYAML(unmarshal func(interface{}) error) error { +func (e *ErrorBehavior) UnmarshalYAML(unmarshal func(interface{}) error) error { var str string if err := unmarshal(&str); err != nil { return err } if value, err := ParseErrorBehavior(str); err == nil { - *v = value + *e = value return nil } diff --git a/src/query/storage/m3/m3_mock.go b/src/query/storage/m3/m3_mock.go index 5977f245de..eac651c737 100644 --- a/src/query/storage/m3/m3_mock.go +++ b/src/query/storage/m3/m3_mock.go @@ -86,6 +86,21 @@ func (mr *MockStorageMockRecorder) CompleteTags(arg0, arg1, arg2 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CompleteTags", reflect.TypeOf((*MockStorage)(nil).CompleteTags), arg0, arg1, arg2) } +// CompleteTagsCompressed mocks base method +func (m *MockStorage) CompleteTagsCompressed(arg0 context.Context, arg1 *storage.CompleteTagsQuery, arg2 *storage.FetchOptions) (*storage.CompleteTagsResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CompleteTagsCompressed", arg0, arg1, arg2) + ret0, _ := ret[0].(*storage.CompleteTagsResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CompleteTagsCompressed indicates an expected call of CompleteTagsCompressed +func (mr *MockStorageMockRecorder) CompleteTagsCompressed(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CompleteTagsCompressed", reflect.TypeOf((*MockStorage)(nil).CompleteTagsCompressed), arg0, arg1, arg2) +} + // ErrorBehavior mocks base method func (m *MockStorage) ErrorBehavior() storage.ErrorBehavior { m.ctrl.T.Helper() diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 5b430ce06b..fb488bd9c4 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -166,44 +166,24 @@ func (s *m3storage) Fetch( return fetchResult, nil } -func (s *m3storage) FetchBlocks( - ctx context.Context, +// FetchResultToBlockResult converts an encoded SeriesIterator fetch result +// into blocks. +func FetchResultToBlockResult( + result SeriesFetchResult, query *storage.FetchQuery, options *storage.FetchOptions, + opts m3db.Options, ) (block.Result, error) { - // Override options with whatever is the current specified lookback duration. - opts := s.opts.SetLookbackDuration( - options.LookbackDurationOrDefault(s.opts.LookbackDuration())) - - // If using decoded block, return the legacy path. - if options.BlockType == models.TypeDecodedBlock { - fetchResult, err := s.Fetch(ctx, query, options) - if err != nil { - return block.Result{ - Metadata: block.NewResultMetadata(), - }, err - } - - return storage.FetchResultToBlockResult(fetchResult, query, - opts.LookbackDuration(), options.Enforcer) - } - // If using multiblock, update options to reflect this. if options.BlockType == models.TypeMultiBlock { opts = opts. SetSplitSeriesByBlock(true) } - result, _, err := s.FetchCompressed(ctx, query, options) - if err != nil { - return block.Result{ - Metadata: block.NewResultMetadata(), - }, err - } - + start := query.Start bounds := models.Bounds{ - Start: query.Start, - Duration: query.End.Sub(query.Start), + Start: start, + Duration: query.End.Sub(start), StepSize: query.Interval, } @@ -212,16 +192,6 @@ func (s *m3storage) FetchBlocks( enforcer = cost.NoopChainedEnforcer() } - // TODO: mutating this array breaks the abstraction a bit, but it's the least - // fussy way I can think of to do this while maintaining the original pooling. - // Alternative would be to fetch a new MutableSeriesIterators() instance from - // the pool, populate it, and then return the original to the pool, which - // feels wasteful. - iters := result.SeriesIterators.Iters() - for i, iter := range iters { - iters[i] = NewAccountedSeriesIter(iter, enforcer, options.Scope) - } - blocks, err := m3db.ConvertM3DBSeriesIterators( result.SeriesIterators, bounds, @@ -241,6 +211,38 @@ func (s *m3storage) FetchBlocks( }, nil } +func (s *m3storage) FetchBlocks( + ctx context.Context, + query *storage.FetchQuery, + options *storage.FetchOptions, +) (block.Result, error) { + // Override options with whatever is the current specified lookback duration. + opts := s.opts.SetLookbackDuration( + options.LookbackDurationOrDefault(s.opts.LookbackDuration())) + + // If using decoded block, return the legacy path. + if options.BlockType == models.TypeDecodedBlock { + fetchResult, err := s.Fetch(ctx, query, options) + if err != nil { + return block.Result{ + Metadata: block.NewResultMetadata(), + }, err + } + + return storage.FetchResultToBlockResult(fetchResult, query, + opts.LookbackDuration(), options.Enforcer) + } + + result, _, err := s.FetchCompressed(ctx, query, options) + if err != nil { + return block.Result{ + Metadata: block.NewResultMetadata(), + }, err + } + + return FetchResultToBlockResult(result, query, options, opts) +} + func (s *m3storage) FetchCompressed( ctx context.Context, query *storage.FetchQuery, @@ -398,6 +400,15 @@ func (s *m3storage) SearchSeries( }, nil } +// CompleteTagsCompressed has the same behavior as CompleteTags. +func (s *m3storage) CompleteTagsCompressed( + ctx context.Context, + query *storage.CompleteTagsQuery, + options *storage.FetchOptions, +) (*storage.CompleteTagsResult, error) { + return s.CompleteTags(ctx, query, options) +} + func (s *m3storage) CompleteTags( ctx context.Context, query *storage.CompleteTagsQuery, diff --git a/src/query/storage/m3/types.go b/src/query/storage/m3/types.go index 205808a0e0..5548ded95a 100644 --- a/src/query/storage/m3/types.go +++ b/src/query/storage/m3/types.go @@ -58,6 +58,13 @@ type Querier interface { query *genericstorage.FetchQuery, options *genericstorage.FetchOptions, ) (TagResult, Cleanup, error) + + // CompleteTagsCompressed returns autocompleted tag results. + CompleteTagsCompressed( + ctx context.Context, + query *genericstorage.CompleteTagsQuery, + options *genericstorage.FetchOptions, + ) (*genericstorage.CompleteTagsResult, error) } // SeriesFetchResult is a fetch result with associated metadata. diff --git a/src/query/storage/validator/storage.go b/src/query/storage/validator/storage.go index 00284303ce..fc71450069 100644 --- a/src/query/storage/validator/storage.go +++ b/src/query/storage/validator/storage.go @@ -41,7 +41,7 @@ type debugStorage struct { } // NewStorage creates a new debug storage instance. -func NewStorage(promReadResp prometheus.PromResp, lookbackDuration time.Duration) (storage.Storage, error) { +func NewStorage(promReadResp prometheus.Response, lookbackDuration time.Duration) (storage.Storage, error) { seriesList, err := PromResultToSeriesList(promReadResp, models.NewTagOptions()) if err != nil { return nil, err @@ -77,7 +77,7 @@ func (s *debugStorage) FetchBlocks( } // PromResultToSeriesList converts a prom result to a series list -func PromResultToSeriesList(promReadResp prometheus.PromResp, tagOptions models.TagOptions) ([]*ts.Series, error) { +func PromResultToSeriesList(promReadResp prometheus.Response, tagOptions models.TagOptions) ([]*ts.Series, error) { if promReadResp.Data.ResultType != "matrix" { return nil, fmt.Errorf("unsupported result type found: %s", promReadResp.Data.ResultType) } diff --git a/src/query/storage/validator/storage_test.go b/src/query/storage/validator/storage_test.go index 5d3d7ca385..68cde3690b 100644 --- a/src/query/storage/validator/storage_test.go +++ b/src/query/storage/validator/storage_test.go @@ -31,27 +31,24 @@ import ( ) func TestConverter(t *testing.T) { - promResult := prometheus.PromResp{ + promResult := prometheus.Response{ Status: "success", } - vals := [][]interface{}{ + vals := prometheus.Values{ {1543434975.200, "10"}, {1543434985.200, "12"}, {1543434995.200, "14"}, } - metrics := map[string]string{ + metrics := prometheus.Tags{ "__name__": "test_name", "tag_one": "val_one", } promResult.Data.ResultType = "matrix" promResult.Data.Result = append(promResult.Data.Result, - struct { - Metric map[string]string `json:"metric"` - Values [][]interface{} `json:"values"` - }{ + prometheus.Result{ Values: vals, Metric: metrics, }, diff --git a/src/query/ts/values.go b/src/query/ts/values.go index 3eac0a5770..90c3db9fc5 100644 --- a/src/query/ts/values.go +++ b/src/query/ts/values.go @@ -106,13 +106,17 @@ func (d Datapoints) alignToBounds( stepSize := bounds.StepSize t := bounds.Start for i := 0; i < steps; i++ { - singleStepValues := make(Datapoints, 0) + singleStepValues := make(Datapoints, 0, 10) + staleThreshold := lookbackDuration + if stepSize > lookbackDuration { + staleThreshold = stepSize + } for dpIdx < numDatapoints && !d[dpIdx].Timestamp.After(t) { point := d[dpIdx] dpIdx++ // Skip stale values - if t.Sub(point.Timestamp) > lookbackDuration { + if t.Sub(point.Timestamp) > staleThreshold { continue } @@ -124,7 +128,7 @@ func (d Datapoints) alignToBounds( if writeForward { if len(singleStepValues) == 0 && dpIdx > 0 { prevPoint := d[dpIdx-1] - if t.Sub(prevPoint.Timestamp) <= lookbackDuration { + if t.Sub(prevPoint.Timestamp) <= staleThreshold { singleStepValues = Datapoints{prevPoint} } } diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index 805e9c2fea..b9a203302a 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -229,40 +229,7 @@ func (c *grpcClient) FetchBlocks( }, err } - // If using multiblock, update options to reflect this. - if options.BlockType == models.TypeMultiBlock { - opts = opts. - SetSplitSeriesByBlock(true) - } - - bounds := models.Bounds{ - Start: query.Start, - Duration: query.End.Sub(query.Start), - StepSize: query.Interval, - } - - enforcer := options.Enforcer - if enforcer == nil { - enforcer = cost.NoopChainedEnforcer() - } - - blocks, err := m3db.ConvertM3DBSeriesIterators( - fetchResult.SeriesIterators, - bounds, - fetchResult.Metadata, - opts, - ) - - if err != nil { - return block.Result{ - Metadata: block.NewResultMetadata(), - }, err - } - - return block.Result{ - Blocks: blocks, - Metadata: fetchResult.Metadata, - }, nil + return m3.FetchResultToBlockResult(fetchResult, query, options, opts) } func (c *grpcClient) SearchSeries( diff --git a/src/query/tsdb/remote/server.go b/src/query/tsdb/remote/server.go index e8ba138cee..9d510727be 100644 --- a/src/query/tsdb/remote/server.go +++ b/src/query/tsdb/remote/server.go @@ -46,7 +46,7 @@ type grpcServer struct { createAt time.Time poolErr error batchSize int - storage m3.Storage + querier m3.Querier queryContextOpts models.QueryContextOptions poolWrapper *pools.PoolWrapper once sync.Once @@ -64,7 +64,7 @@ func min(a, b int) int { // NewGRPCServer builds a grpc server which must be started later. func NewGRPCServer( - store m3.Storage, + querier m3.Querier, queryContextOpts models.QueryContextOptions, poolWrapper *pools.PoolWrapper, instrumentOpts instrument.Options, @@ -72,7 +72,7 @@ func NewGRPCServer( server := grpc.NewServer() grpcServer := &grpcServer{ createAt: time.Now(), - storage: store, + querier: querier, queryContextOpts: queryContextOpts, poolWrapper: poolWrapper, instrumentOpts: instrumentOpts, @@ -126,8 +126,7 @@ func (s *grpcServer) Fetch( fetchOpts.Limit = s.queryContextOpts.LimitMaxTimeseries } - result, cleanup, err := s.storage.FetchCompressed(ctx, - storeQuery, fetchOpts) + result, cleanup, err := s.querier.FetchCompressed(ctx, storeQuery, fetchOpts) defer cleanup() if err != nil { logger.Error("unable to fetch local query", zap.Error(err)) @@ -185,8 +184,8 @@ func (s *grpcServer) Search( return err } - searchResults, cleanup, err := s.storage.SearchCompressed(ctx, - searchQuery, fetchOpts) + searchResults, cleanup, err := s.querier.SearchCompressed(ctx, searchQuery, + fetchOpts) defer cleanup() if err != nil { logger.Error("unable to search tags", zap.Error(err)) @@ -240,7 +239,7 @@ func (s *grpcServer) CompleteTags( return err } - completed, err := s.storage.CompleteTags(ctx, completeTagsQuery, fetchOpts) + completed, err := s.querier.CompleteTagsCompressed(ctx, completeTagsQuery, fetchOpts) if err != nil { logger.Error("unable to complete tags", zap.Error(err)) return err diff --git a/src/query/tsdb/remote/server_test.go b/src/query/tsdb/remote/server_test.go index 8d08d898a8..ca91a4c2f0 100644 --- a/src/query/tsdb/remote/server_test.go +++ b/src/query/tsdb/remote/server_test.go @@ -530,7 +530,7 @@ func TestBatchedCompleteTags(t *testing.T) { }, } - store.EXPECT().CompleteTags(gomock.Any(), gomock.Any(), gomock.Any()). + store.EXPECT().CompleteTagsCompressed(gomock.Any(), gomock.Any(), gomock.Any()). Return(expected, nil) listener := startServer(t, ctrl, store)