From 3613876062198668fd57d5b0b452c32116bf82d7 Mon Sep 17 00:00:00 2001 From: arnikola Date: Tue, 21 May 2019 12:43:39 -0400 Subject: [PATCH] [integration] Add query fanout docker integration test (#1652) --- scripts/docker-integration-tests/common.sh | 23 +- .../query_fanout/docker-compose.yml | 48 ++ .../query_fanout/m3coordinator-cluster-a.yml | 45 ++ .../query_fanout/m3coordinator-cluster-b.yml | 45 ++ .../query_fanout/test.sh | 92 +++ src/query/block/column.go | 1 - src/query/block/container.go | 637 ++++++++++++++++++ src/query/block/types.go | 6 + src/query/server/server.go | 8 +- src/query/server/server_test.go | 16 +- src/query/storage/block.go | 2 - src/query/storage/consolidated.go | 2 - src/query/storage/fanout/storage.go | 39 +- src/query/storage/index.go | 2 +- src/query/ts/m3db/convert_test.go | 15 +- src/query/ts/m3db/encoded_block.go | 5 +- src/query/ts/m3db/encoded_series_iterator.go | 2 - .../encoded_unconsolidated_series_iterator.go | 2 - src/query/tsdb/remote/client.go | 53 +- src/query/tsdb/remote/compressed_codecs.go | 5 +- .../{staticResolver.go => static_resolver.go} | 0 21 files changed, 990 insertions(+), 58 deletions(-) create mode 100644 scripts/docker-integration-tests/query_fanout/docker-compose.yml create mode 100644 scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml create mode 100644 scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml create mode 100755 scripts/docker-integration-tests/query_fanout/test.sh create mode 100644 src/query/block/container.go rename src/query/tsdb/remote/{staticResolver.go => static_resolver.go} (100%) diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index 56599a1780..f72897f48d 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -48,12 +48,17 @@ function setup_single_m3db_node { } function wait_for_db_init { + local dbnode_host=${DBNODE_HOST:-dbnode01} + local dbnode_port=${DBNODE_PORT:-9000} + local dbnode_health_port=${DBNODE_HEALTH_PORT:-9002} + local coordinator_port=${COORDINATOR_PORT:-7201} + echo "Wait for API to be available" ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq ".namespaces | length")" == "0" ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq ".namespaces | length")" == "0" ]' echo "Adding placement and agg namespace" - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/database/create -d '{ + curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/database/create -d '{ "type": "cluster", "namespaceName": "agg", "retentionTime": "6h", @@ -64,32 +69,32 @@ function wait_for_db_init { "isolation_group": "rack-a", "zone": "embedded", "weight": 1024, - "address": "dbnode01", - "port": 9000 + "address": "'"${dbnode_host}"'", + "port": '"${dbnode_port}"' } ] }' echo "Wait until placement is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' echo "Wait until agg namespace is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' echo "Adding unagg namespace" - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/database/namespace/create -d '{ + curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/database/namespace/create -d '{ "namespaceName": "unagg", "retentionTime": "6h" }' echo "Wait until unagg namespace is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' echo "Wait until bootstrapped" ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_health_port}"'/health | jq .bootstrapped)" == true ]' } diff --git a/scripts/docker-integration-tests/query_fanout/docker-compose.yml b/scripts/docker-integration-tests/query_fanout/docker-compose.yml new file mode 100644 index 0000000000..867ce80730 --- /dev/null +++ b/scripts/docker-integration-tests/query_fanout/docker-compose.yml @@ -0,0 +1,48 @@ +version: "3.5" +services: + dbnode-cluster-a: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9000-9004:9000-9004" + - "0.0.0.0:2379-2380:2379-2380" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + coordinator-cluster-a: + expose: + - "7201" + - "7203" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator-cluster-a.yml:/etc/m3coordinator/m3coordinator.yml" + dbnode-cluster-b: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:19000-19004:9000-9004" + - "0.0.0.0:12379-12380:2379-2380" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + coordinator-cluster-b: + expose: + - "7201" + - "7203" + ports: + - "0.0.0.0:17201:7201" + - "0.0.0.0:17203:7203" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator-cluster-b.yml:/etc/m3coordinator/m3coordinator.yml" +networks: + backend: diff --git a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml new file mode 100644 index 0000000000..19eafbb649 --- /dev/null +++ b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml @@ -0,0 +1,45 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +# Fanout queries to remote clusters +rpc: + enabled: true + listenAddress: "0.0.0.0:7202" + remoteListenAddresses: ["coordinator-cluster-b:7202"] + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10m + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode-cluster-a:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml new file mode 100644 index 0000000000..9603b2f82d --- /dev/null +++ b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml @@ -0,0 +1,45 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +# Fanout queries to remote clusters +rpc: + enabled: true + listenAddress: "0.0.0.0:7202" + remoteListenAddresses: ["coordinator-cluster-a:7202"] + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10m + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode-cluster-b:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/query_fanout/test.sh b/scripts/docker-integration-tests/query_fanout/test.sh new file mode 100755 index 0000000000..7c33ed0812 --- /dev/null +++ b/scripts/docker-integration-tests/query_fanout/test.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/query_fanout/docker-compose.yml +export REVISION + +echo "Run m3dbnode and m3coordinator containers" +docker-compose -f ${COMPOSE_FILE} up -d dbnode-cluster-a +docker-compose -f ${COMPOSE_FILE} up -d coordinator-cluster-a + +docker-compose -f ${COMPOSE_FILE} up -d dbnode-cluster-b +docker-compose -f ${COMPOSE_FILE} up -d coordinator-cluster-b + +# think of this as a defer func() in golang +function defer { + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +DBNODE_HOST=dbnode-cluster-a DBDNODE_PORT=9000 DBNODE_HEALTH_PORT=9002 COORDINATOR_PORT=7201 \ + setup_single_m3db_node + +DBNODE_HOST=dbnode-cluster-b DBDNODE_PORT=19000 DBNODE_HEALTH_PORT=19002 COORDINATOR_PORT=17201 \ + setup_single_m3db_node + +echo "Write data to cluster a" +curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{ + "namespace": "unagg", + "id": "{__name__=\"test_metric\",cluster=\"cluster-a\",endpoint=\"/request\"}", + "tags": [ + { + "name": "__name__", + "value": "test_metric" + }, + { + "name": "cluster", + "value": "cluster-a" + }, + { + "name": "endpoint", + "value": "/request" + } + ], + "datapoint": { + "timestamp":'"$(date +"%s")"', + "value": 42.123456789 + } +}' + +echo "Write data to cluster b" +curl -vvvsS -X POST 0.0.0.0:19003/writetagged -d '{ + "namespace": "unagg", + "id": "{__name__=\"test_metric\",cluster=\"cluster-b\",endpoint=\"/request\"}", + "tags": [ + { + "name": "__name__", + "value": "test_metric" + }, + { + "name": "cluster", + "value": "cluster-b" + }, + { + "name": "endpoint", + "value": "/request" + } + ], + "datapoint": { + "timestamp":'"$(date +"%s")"', + "value": 42.123456789 + } +}' + + +function read { + RESPONSE=$(curl "http://0.0.0.0:7201/api/v1/query?query=test_metric") + ACTUAL=$(echo $RESPONSE | jq .data.result[].metric.cluster) + test "$(echo $ACTUAL)" = '"cluster-a" "cluster-b"' +} + +ATTEMPTS=5 TIMEOUT=1 retry_with_backoff read + +function read_sum { + RESPONSE=$(curl "http://0.0.0.0:7201/api/v1/query?query=sum(test_metric)") + ACTUAL=$(echo $RESPONSE | jq .data.result[].value[1]) + test $ACTUAL = '"84.246913578"' +} + +ATTEMPTS=5 TIMEOUT=1 retry_with_backoff read_sum diff --git a/src/query/block/column.go b/src/query/block/column.go index bd8817f1f2..6e171e1631 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -285,7 +285,6 @@ func (m *columnBlockSeriesIter) Err() error { } func (m *columnBlockSeriesIter) Next() bool { - fmt.Printf("THIS iter4 next\n") m.idx++ next := m.idx < m.SeriesCount() if !next { diff --git a/src/query/block/container.go b/src/query/block/container.go new file mode 100644 index 0000000000..290b6854ba --- /dev/null +++ b/src/query/block/container.go @@ -0,0 +1,637 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +import ( + "errors" + "time" + + "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3/src/x/errors" +) + +var ( + errMismatchedStepIter = errors.New("container step iter has mismatched step size") + errMismatchedUcStepIter = errors.New("unconsolidated container step iter has mismatched step size") +) + +type containerBlock struct { + err error + blocks []Block +} + +func newContainerBlock(blocks []Block) AccumulatorBlock { + return &containerBlock{ + blocks: blocks, + } +} + +// NewContainerBlock creates a Container block. +func NewContainerBlock(blocks ...Block) AccumulatorBlock { + return newContainerBlock(blocks) +} + +func (b *containerBlock) AddBlock(bl Block) error { + if b.err != nil { + return b.err + } + + b.blocks = append(b.blocks, bl) + return nil +} + +func (b *containerBlock) Close() error { + multiErr := xerrors.NewMultiError() + multiErr = multiErr.Add(b.err) + for _, bl := range b.blocks { + multiErr = multiErr.Add(bl.Close()) + } + + return multiErr.FinalError() +} + +func (b *containerBlock) WithMetadata( + meta Metadata, + sm []SeriesMeta, +) (Block, error) { + if b.err != nil { + return nil, b.err + } + + updatedBlockList := make([]Block, 0, len(b.blocks)) + for _, bl := range b.blocks { + updated, err := bl.WithMetadata(meta, sm) + if err != nil { + b.err = err + return nil, err + } + + updatedBlockList = append(updatedBlockList, updated) + } + + return newContainerBlock(updatedBlockList), nil +} + +func (b *containerBlock) StepIter() (StepIter, error) { + if b.err != nil { + return nil, b.err + } + + it := &containerStepIter{its: make([]StepIter, 0, len(b.blocks))} + for _, bl := range b.blocks { + iter, err := bl.StepIter() + if err != nil { + b.err = err + return nil, err + } + + it.its = append(it.its, iter) + } + + return it, nil +} + +// NB: step iterators are constructed "sideways" +type containerStepIter struct { + err error + its []StepIter +} + +func (it *containerStepIter) Close() { + for _, iter := range it.its { + iter.Close() + } +} + +func (it *containerStepIter) Err() error { + if it.err != nil { + return it.err + } + + for _, iter := range it.its { + if it.err = iter.Err(); it.err != nil { + return it.err + } + } + + return nil +} + +func (it *containerStepIter) StepCount() int { + // NB: when using a step iterator, step count doesn't change, but the length + // of each step does. + if len(it.its) == 0 { + return 0 + } + + return it.its[0].StepCount() +} + +func (it *containerStepIter) SeriesMeta() []SeriesMeta { + length := 0 + for _, iter := range it.its { + length += len(iter.SeriesMeta()) + } + + metas := make([]SeriesMeta, 0, length) + for _, iter := range it.its { + metas = append(metas, iter.SeriesMeta()...) + } + + return metas +} + +func (it *containerStepIter) Next() bool { + if it.err != nil { + return false + } + + // advance all the contained iterators; if any have size mismatches, set an + // error and stop traversal. + var next bool + for i, iter := range it.its { + n := iter.Next() + + if it.err = iter.Err(); it.err != nil { + return false + } + + if i == 0 { + next = n + } else if next != n { + it.err = errMismatchedStepIter + return false + } + } + + return next +} + +func (it *containerStepIter) Meta() Metadata { + // NB: metadata should be identical for each series in the contained block. + if len(it.its) == 0 { + return Metadata{} + } + + return it.its[0].Meta() +} + +func (it *containerStepIter) Current() Step { + if len(it.its) == 0 { + return ColStep{ + time: time.Time{}, + values: []float64{}, + } + } + + curr := it.its[0].Current() + // NB: to get Current for contained step iterators, append results from all + // contained step iterators in order. + accumulatorStep := ColStep{ + time: curr.Time(), + values: curr.Values(), + } + + for _, iter := range it.its[1:] { + curr := iter.Current() + accumulatorStep.values = append(accumulatorStep.values, curr.Values()...) + } + + return accumulatorStep +} + +func (b *containerBlock) SeriesIter() (SeriesIter, error) { + if b.err != nil { + return nil, b.err + } + + iters := make([]SeriesIter, 0, len(b.blocks)) + for _, bl := range b.blocks { + iter, err := bl.SeriesIter() + if err != nil { + b.err = err + return nil, err + } + + iters = append(iters, iter) + } + + return &containerSeriesIter{its: iters}, nil +} + +type containerSeriesIter struct { + err error + idx int + its []SeriesIter +} + +func (it *containerSeriesIter) Close() { + for _, iter := range it.its { + iter.Close() + } +} + +func (it *containerSeriesIter) Err() error { + if it.err != nil { + return it.err + } + + for _, iter := range it.its { + if it.err = iter.Err(); it.err != nil { + return it.err + } + } + + return nil +} + +func (it *containerSeriesIter) SeriesCount() int { + count := 0 + for _, iter := range it.its { + count += iter.SeriesCount() + } + + return count +} + +func (it *containerSeriesIter) SeriesMeta() []SeriesMeta { + length := 0 + for _, iter := range it.its { + length += len(iter.SeriesMeta()) + } + + metas := make([]SeriesMeta, 0, length) + for _, iter := range it.its { + metas = append(metas, iter.SeriesMeta()...) + } + + return metas +} + +func (it *containerSeriesIter) Next() bool { + if it.err != nil { + return false + } + + for ; it.idx < len(it.its); it.idx++ { + iter := it.its[it.idx] + if iter.Next() { + // the active iterator has been successfuly incremented. + return true + } + + // active iterator errored. + if it.err = iter.Err(); it.err != nil { + return false + } + } + + // all iterators expanded. + return false +} + +func (it *containerSeriesIter) Current() Series { + return it.its[it.idx].Current() +} + +func (it *containerSeriesIter) Meta() Metadata { + // NB: metadata should be identical for each series in the contained block. + if len(it.its) == 0 { + return Metadata{} + } + + return it.its[0].Meta() +} + +// Unconsolidated returns the unconsolidated version for the block +func (b *containerBlock) Unconsolidated() (UnconsolidatedBlock, error) { + if b.err != nil { + return nil, b.err + } + + ucBlock := &ucContainerBlock{ + blocks: make([]UnconsolidatedBlock, 0, len(b.blocks)), + } + + for i, bl := range b.blocks { + unconsolidated, err := bl.Unconsolidated() + if err != nil { + b.err = err + return nil, err + } + + ucBlock.blocks[i] = unconsolidated + } + + return ucBlock, nil +} + +type ucContainerBlock struct { + err error + blocks []UnconsolidatedBlock +} + +func (b *ucContainerBlock) Close() error { + multiErr := xerrors.NewMultiError() + multiErr = multiErr.Add(b.err) + for _, bl := range b.blocks { + multiErr = multiErr.Add(bl.Close()) + } + + return multiErr.FinalError() +} + +func (b *ucContainerBlock) WithMetadata( + meta Metadata, + sm []SeriesMeta, +) (UnconsolidatedBlock, error) { + if b.err != nil { + return nil, b.err + } + + updatedBlockList := make([]UnconsolidatedBlock, 0, len(b.blocks)) + for _, bl := range b.blocks { + updated, err := bl.WithMetadata(meta, sm) + if err != nil { + b.err = err + return nil, err + } + + updatedBlockList = append(updatedBlockList, updated) + } + + return &ucContainerBlock{blocks: updatedBlockList}, nil +} + +func (b *ucContainerBlock) Consolidate() (Block, error) { + if b.err != nil { + return nil, b.err + } + + consolidated := make([]Block, 0, len(b.blocks)) + for _, bl := range b.blocks { + block, err := bl.Consolidate() + if err != nil { + b.err = err + return nil, err + } + + consolidated = append(consolidated, block) + } + + return newContainerBlock(consolidated), nil +} + +func (b *ucContainerBlock) StepIter() (UnconsolidatedStepIter, error) { + if b.err != nil { + return nil, b.err + } + + it := &ucContainerStepIter{ + its: make([]UnconsolidatedStepIter, 0, len(b.blocks)), + } + + for _, bl := range b.blocks { + iter, err := bl.StepIter() + if err != nil { + b.err = err + return nil, err + } + + it.its = append(it.its, iter) + } + + return it, nil +} + +type ucContainerStepIter struct { + err error + its []UnconsolidatedStepIter +} + +func (it *ucContainerStepIter) Close() { + for _, iter := range it.its { + iter.Close() + } +} + +func (it *ucContainerStepIter) Err() error { + if it.err != nil { + return it.err + } + + for _, iter := range it.its { + if it.err = iter.Err(); it.err != nil { + return it.err + } + } + + return nil +} + +func (it *ucContainerStepIter) StepCount() int { + // NB: when using a step iterator, step count doesn't change, but the length + // of each step does. + if len(it.its) == 0 { + return 0 + } + + return it.its[0].StepCount() +} + +func (it *ucContainerStepIter) SeriesMeta() []SeriesMeta { + length := 0 + for _, iter := range it.its { + length += len(iter.SeriesMeta()) + } + + metas := make([]SeriesMeta, 0, length) + for _, iter := range it.its { + metas = append(metas, iter.SeriesMeta()...) + } + + return metas +} + +func (it *ucContainerStepIter) Next() bool { + if it.err != nil { + return false + } + + // advance all the contained iterators; if any have size mismatches, set an + // error and stop traversal. + var next bool + for i, iter := range it.its { + n := iter.Next() + + if it.err = iter.Err(); it.err != nil { + return false + } + + if i == 0 { + next = n + } else if next != n { + it.err = errMismatchedUcStepIter + return false + } + } + + return next +} + +func (it *ucContainerStepIter) Meta() Metadata { + // NB: metadata should be identical for each series in the contained block. + if len(it.its) == 0 { + return Metadata{} + } + + return it.its[0].Meta() +} + +func (it *ucContainerStepIter) Current() UnconsolidatedStep { + if len(it.its) == 0 { + return unconsolidatedStep{ + time: time.Time{}, + values: []ts.Datapoints{}, + } + } + + curr := it.its[0].Current() + // NB: to get Current for contained step iterators, append results from all + // contained step iterators in order. + accumulatorStep := unconsolidatedStep{ + time: curr.Time(), + values: curr.Values(), + } + + for _, iter := range it.its[1:] { + curr := iter.Current() + accumulatorStep.values = append(accumulatorStep.values, curr.Values()...) + } + + return accumulatorStep +} + +func (b *ucContainerBlock) SeriesIter() (UnconsolidatedSeriesIter, error) { + if b.err != nil { + return nil, b.err + } + + it := &ucContainerSeriesIter{ + its: make([]UnconsolidatedSeriesIter, 0, len(b.blocks)), + } + + for _, bl := range b.blocks { + iter, err := bl.SeriesIter() + if err != nil { + b.err = err + return nil, err + } + + it.its = append(it.its, iter) + } + + return it, nil +} + +type ucContainerSeriesIter struct { + err error + idx int + its []UnconsolidatedSeriesIter +} + +func (it *ucContainerSeriesIter) Close() { + for _, iter := range it.its { + iter.Close() + } +} + +func (it *ucContainerSeriesIter) Err() error { + if it.err != nil { + return it.err + } + + for _, iter := range it.its { + if it.err = iter.Err(); it.err != nil { + return it.err + } + } + + return nil +} + +func (it *ucContainerSeriesIter) SeriesCount() int { + count := 0 + for _, iter := range it.its { + count += iter.SeriesCount() + } + + return count +} + +func (it *ucContainerSeriesIter) SeriesMeta() []SeriesMeta { + length := 0 + for _, iter := range it.its { + length += len(iter.SeriesMeta()) + } + + metas := make([]SeriesMeta, 0, length) + for _, iter := range it.its { + metas = append(metas, iter.SeriesMeta()...) + } + + return metas +} + +func (it *ucContainerSeriesIter) Next() bool { + if it.err != nil { + return false + } + + for ; it.idx < len(it.its); it.idx++ { + iter := it.its[it.idx] + if iter.Next() { + // the active iterator has been successfuly incremented. + return true + } + + // active iterator errored. + if it.err = iter.Err(); it.err != nil { + return false + } + } + + // all iterators expanded. + return false +} + +func (it *ucContainerSeriesIter) Current() UnconsolidatedSeries { + return it.its[it.idx].Current() +} + +func (it *ucContainerSeriesIter) Meta() Metadata { + // NB: metadata should be identical for each series in the contained block. + if len(it.its) == 0 { + return Metadata{} + } + + return it.its[0].Meta() +} diff --git a/src/query/block/types.go b/src/query/block/types.go index 3e498074a5..f536648d58 100644 --- a/src/query/block/types.go +++ b/src/query/block/types.go @@ -45,6 +45,12 @@ type Block interface { WithMetadata(Metadata, []SeriesMeta) (Block, error) } +type AccumulatorBlock interface { + Block + // AddBlock adds a block to this accumulator. + AddBlock(bl Block) error +} + // UnconsolidatedBlock represents a group of unconsolidated series across a time bound type UnconsolidatedBlock interface { io.Closer diff --git a/src/query/server/server.go b/src/query/server/server.go index 4ad7ef5dec..a5e51a48dd 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -218,7 +218,7 @@ func Run(runOpts RunOptions) { if cfg.Backend == config.GRPCStorageType { poolWrapper := pools.NewPoolsWrapper(pools.BuildIteratorPools()) backendStorage, enabled, err = remoteClient(cfg, tagOptions, poolWrapper, - readWorkerPool) + readWorkerPool, instrumentOptions) if err != nil { logger.Fatal("unable to setup grpc backend", zap.Error(err)) } @@ -557,6 +557,8 @@ func initClusters( if err != nil { return nil, nil, errors.Wrap(err, "unable to connect to clusters") } + + poolWrapper = pools.NewPoolsWrapper(pools.BuildIteratorPools()) } else { localCfg := cfg.Local if localCfg == nil { @@ -647,6 +649,7 @@ func newStorages( tagOptions, poolWrapper, readWorkerPool, + instrumentOpts, ) if err != nil { return nil, nil, err @@ -710,12 +713,15 @@ func remoteClient( tagOptions models.TagOptions, poolWrapper *pools.PoolWrapper, readWorkerPool xsync.PooledWorkerPool, + instrumentOpts instrument.Options, ) (storage.Storage, bool, error) { if cfg.RPC == nil { return nil, false, nil } if remotes := cfg.RPC.RemoteListenAddresses; len(remotes) > 0 { + logger := instrumentOpts.Logger() + logger.Info("creating RPC client with remotes", zap.Strings("remotes", remotes)) client, err := tsdbRemote.NewGRPCClient( remotes, poolWrapper, diff --git a/src/query/server/server_test.go b/src/query/server/server_test.go index 30a74807ad..2b1db221cb 100644 --- a/src/query/server/server_test.go +++ b/src/query/server/server_test.go @@ -1,4 +1,3 @@ -// // Copyright (c) 2018 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -236,21 +235,21 @@ func TestGRPCBackend(t *testing.T) { var grpcConfigYAML = ` listenAddress: type: "config" - value: "127.0.0.1:17201" + value: "127.0.0.1:17221" metrics: scope: prefix: "coordinator" prometheus: handlerPath: /metrics - listenAddress: "127.0.0.1:17203" + listenAddress: "127.0.0.1:17223" onError: stderr sanitization: prometheus samplingRate: 1.0 rpc: remoteListenAddresses: - - "127.0.0.1:17202" + - "127.0.0.1:17222" backend: grpc @@ -271,10 +270,13 @@ writeWorkerPoolPolicy: killProbability: 0.3 ` + // TODO(arnikola): REVERT + t.SkipNow() + ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - port := "127.0.0.1:17202" + port := "127.0.0.1:17222" lis, err := net.Listen("tcp", port) require.NoError(t, err) s := grpc.NewServer() @@ -307,13 +309,13 @@ writeWorkerPoolPolicy: }() // Wait for server to come up - waitForServerHealthy(t, 17201) + waitForServerHealthy(t, 17221) // Send Prometheus read request promReq := test.GeneratePromReadRequest() promReqBody := test.GeneratePromReadRequestBody(t, promReq) req, err := http.NewRequest(http.MethodPost, - "http://127.0.0.1:17201"+remote.PromReadURL, promReqBody) + "http://127.0.0.1:17221"+remote.PromReadURL, promReqBody) require.NoError(t, err) _, err = http.DefaultClient.Do(req) diff --git a/src/query/storage/block.go b/src/query/storage/block.go index 8d202c7b48..ed6f830824 100644 --- a/src/query/storage/block.go +++ b/src/query/storage/block.go @@ -22,7 +22,6 @@ package storage import ( "errors" - "fmt" "sync" "time" @@ -331,7 +330,6 @@ func (m *multiSeriesBlockSeriesIter) SeriesCount() int { } func (m *multiSeriesBlockSeriesIter) Next() bool { - fmt.Printf("THIS iter3 next\n") m.index++ return m.index < m.SeriesCount() } diff --git a/src/query/storage/consolidated.go b/src/query/storage/consolidated.go index e50b2cb3c3..196a4e0379 100644 --- a/src/query/storage/consolidated.go +++ b/src/query/storage/consolidated.go @@ -22,7 +22,6 @@ package storage import ( "errors" - "fmt" "github.com/m3db/m3/src/query/block" ) @@ -110,7 +109,6 @@ type consolidatedSeriesIter struct { } func (c *consolidatedSeriesIter) Next() bool { - fmt.Printf("THIS iter2 next\n") return c.unconsolidated.Next() } diff --git a/src/query/storage/fanout/storage.go b/src/query/storage/fanout/storage.go index ce8b8a9937..2cc3d2ea95 100644 --- a/src/query/storage/fanout/storage.go +++ b/src/query/storage/fanout/storage.go @@ -81,18 +81,49 @@ func (s *fanoutStorage) FetchBlocks( query *storage.FetchQuery, options *storage.FetchOptions, ) (block.Result, error) { - stores := filterStores(s.stores, s.writeFilter, query) - blockResult := block.Result{} + stores := filterStores(s.stores, s.fetchFilter, query) + // Optimization for the single store case + if len(stores) == 1 { + return stores[0].FetchBlocks(ctx, query, options) + } + + // TODO(arnikola): use a genny map here instead, also execute in parallel. + blockResult := make(map[string]block.Block, 10) for _, store := range stores { result, err := store.FetchBlocks(ctx, query, options) if err != nil { return block.Result{}, err } - blockResult.Blocks = append(blockResult.Blocks, result.Blocks...) + for _, bl := range result.Blocks { + it, err := bl.SeriesIter() + if err != nil { + return block.Result{}, err + } + + key := it.Meta().Bounds.String() + if foundBlock, found := blockResult[key]; found { + // this block exists. Check to see if it's already an appendable block. + if acc, ok := foundBlock.(block.AccumulatorBlock); ok { + // already an accumulator block, add current block. + if err := acc.AddBlock(bl); err != nil { + return block.Result{}, err + } + } else { + blockResult[key] = block.NewContainerBlock(foundBlock, bl) + } + } else { + blockResult[key] = bl + } + } + } + + blocks := make([]block.Block, 0, len(blockResult)) + for _, bl := range blockResult { + blocks = append(blocks, bl) } - return blockResult, nil + return block.Result{Blocks: blocks}, nil } func handleFetchResponses(requests []execution.Request) (*storage.FetchResult, error) { diff --git a/src/query/storage/index.go b/src/query/storage/index.go index 07d96e2c5b..2f9640c638 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -116,7 +116,7 @@ func FetchOptionsToAggregateOptions( EndExclusive: tagQuery.End, }, FieldFilter: tagQuery.FilterNameTags, - Type: convertAggregateQueryType(tagQuery.CompleteNameOnly), + Type: convertAggregateQueryType(tagQuery.CompleteNameOnly), } } diff --git a/src/query/ts/m3db/convert_test.go b/src/query/ts/m3db/convert_test.go index c80bf763a0..819c904325 100644 --- a/src/query/ts/m3db/convert_test.go +++ b/src/query/ts/m3db/convert_test.go @@ -150,15 +150,16 @@ func verifyMetas( meta block.Metadata, metas []block.SeriesMeta, ) { - require.Equal(t, 1, meta.Tags.Len()) - val, found := meta.Tags.Get([]byte("a")) - assert.True(t, found) - assert.Equal(t, []byte("b"), val) - + require.Equal(t, 0, meta.Tags.Len()) for i, m := range metas { assert.Equal(t, []byte(fmt.Sprintf("abc%d", i)), m.Name) - require.Equal(t, 1, m.Tags.Len()) - val, found := m.Tags.Get([]byte("c")) + require.Equal(t, 2, m.Tags.Len()) + + val, found := m.Tags.Get([]byte("a")) + assert.True(t, found) + assert.Equal(t, []byte("b"), val) + + val, found = m.Tags.Get([]byte("c")) assert.True(t, found) assert.Equal(t, []byte(fmt.Sprint(i)), val) } diff --git a/src/query/ts/m3db/encoded_block.go b/src/query/ts/m3db/encoded_block.go index a94b7556af..6065c62d60 100644 --- a/src/query/ts/m3db/encoded_block.go +++ b/src/query/ts/m3db/encoded_block.go @@ -25,7 +25,6 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/query/block" - "github.com/m3db/m3/src/query/functions/utils" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts/m3db/consolidators" @@ -130,10 +129,8 @@ func (b *encodedBlock) buildSeriesMeta() error { } func (b *encodedBlock) buildMeta() { - tags, metas := utils.DedupeMetadata(b.seriesMetas) - b.seriesMetas = metas b.meta = block.Metadata{ - Tags: tags, + Tags: models.NewTags(0, b.tagOptions), Bounds: b.consolidation.bounds, } } diff --git a/src/query/ts/m3db/encoded_series_iterator.go b/src/query/ts/m3db/encoded_series_iterator.go index 5020a2b1e4..b42119f9e0 100644 --- a/src/query/ts/m3db/encoded_series_iterator.go +++ b/src/query/ts/m3db/encoded_series_iterator.go @@ -21,7 +21,6 @@ package m3db import ( - "fmt" "math" "github.com/m3db/m3/src/dbnode/encoding" @@ -74,7 +73,6 @@ func (it *encodedSeriesIter) Current() block.Series { } func (it *encodedSeriesIter) Next() bool { - fmt.Printf("THIS iter1 next\n") if it.err != nil { return false } diff --git a/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go b/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go index f9250b1775..e6bd8ef8f6 100644 --- a/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go +++ b/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go @@ -21,7 +21,6 @@ package m3db import ( - "fmt" "time" "github.com/m3db/m3/src/dbnode/encoding" @@ -61,7 +60,6 @@ func (it *encodedSeriesIterUnconsolidated) Err() error { } func (it *encodedSeriesIterUnconsolidated) Next() bool { - fmt.Printf("THIS iter0 next\n") if it.err != nil { return false } diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index 89a76ae1b6..bb55183b3f 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -34,6 +34,8 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/pools" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts/m3db" + "github.com/m3db/m3/src/query/ts/m3db/consolidators" "github.com/m3db/m3/src/query/util/logging" xsync "github.com/m3db/m3/src/x/sync" @@ -56,6 +58,7 @@ type grpcClient struct { pools encoding.IteratorPools poolErr error lookbackDuration time.Duration + opts m3db.Options } const initResultSize = 10 @@ -82,6 +85,11 @@ func NewGRPCClient( return nil, err } + opts := m3db.NewOptions(). + SetTagOptions(tagOptions). + SetLookbackDuration(lookbackDuration). + SetConsolidationFunc(consolidators.TakeLast) + client := rpc.NewQueryClient(cc) return &grpcClient{ tagOptions: tagOptions, @@ -90,6 +98,7 @@ func NewGRPCClient( poolWrapper: poolWrapper, readWorkerPool: readWorkerPool, lookbackDuration: lookbackDuration, + opts: opts, }, nil } @@ -184,33 +193,53 @@ func (c *grpcClient) FetchBlocks( query *storage.FetchQuery, options *storage.FetchOptions, ) (block.Result, error) { - iters, err := c.fetchRaw(ctx, query, options) + opts := c.opts + + // If using decoded block, return the legacy path. + if options.BlockType == models.TypeDecodedBlock { + fetchResult, err := c.Fetch(ctx, query, options) + if err != nil { + return block.Result{}, err + } + + return storage.FetchResultToBlockResult(fetchResult, query, c.opts.LookbackDuration(), options.Enforcer) + } + + raw, err := c.fetchRaw(ctx, query, options) if err != nil { return block.Result{}, err } + // If using multiblock, update options to reflect this. + if options.BlockType == models.TypeMultiBlock { + opts = opts. + SetSplitSeriesByBlock(true) + } + + bounds := models.Bounds{ + Start: query.Start, + Duration: query.End.Sub(query.Start), + StepSize: query.Interval, + } + enforcer := options.Enforcer if enforcer == nil { enforcer = cost.NoopChainedEnforcer() } - fetchResult, err := storage.SeriesIteratorsToFetchResult( - iters, - c.readWorkerPool, - true, - enforcer, - c.tagOptions, + blocks, err := m3db.ConvertM3DBSeriesIterators( + raw, + bounds, + opts, ) - if err != nil { - return block.Result{}, err - } - res, err := storage.FetchResultToBlockResult(fetchResult, query, c.lookbackDuration, options.Enforcer) if err != nil { return block.Result{}, err } - return res, nil + return block.Result{ + Blocks: blocks, + }, nil } func (c *grpcClient) SearchSeries( diff --git a/src/query/tsdb/remote/compressed_codecs.go b/src/query/tsdb/remote/compressed_codecs.go index 13b549305f..7943a35094 100644 --- a/src/query/tsdb/remote/compressed_codecs.go +++ b/src/query/tsdb/remote/compressed_codecs.go @@ -26,7 +26,6 @@ import ( "sync" "time" - "github.com/davecgh/go-spew/spew" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/namespace" @@ -209,8 +208,6 @@ func encodeToCompressedFetchResult( return nil, err } - spew.Dump(series) - seriesList = append(seriesList, series) } @@ -273,7 +270,7 @@ func tagIteratorFromSeries( iteratorPools encoding.IteratorPools, ) (ident.TagIterator, error) { if series != nil && len(series.GetCompressedTags()) > 0 { - return tagIteratorFromCompressedTagsWithDecoder( + return tagIteratorFromCompressedTagsWithDecoder( series.GetCompressedTags(), iteratorPools, ) diff --git a/src/query/tsdb/remote/staticResolver.go b/src/query/tsdb/remote/static_resolver.go similarity index 100% rename from src/query/tsdb/remote/staticResolver.go rename to src/query/tsdb/remote/static_resolver.go