From d54e2fe316a1031bfba774f5fbd5c07fe7853c9f Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 15 Aug 2019 19:29:12 -0400 Subject: [PATCH 1/5] [query] fix how time() source is processed --- src/query/block/scalar.go | 41 +++--- src/query/block/scalar_test.go | 2 +- src/query/functions/binary/binary.go | 11 +- src/query/functions/binary/binary_test.go | 48 ++----- .../functions/scalar/{base.go => scalar.go} | 50 +++---- .../scalar/{base_test.go => scalar_test.go} | 25 ++-- src/query/functions/scalar/time.go | 122 ++++++++++++++++++ src/query/parser/promql/parse.go | 7 +- src/query/parser/promql/types.go | 14 +- 9 files changed, 187 insertions(+), 133 deletions(-) rename src/query/functions/scalar/{base.go => scalar.go} (74%) rename src/query/functions/scalar/{base_test.go => scalar_test.go} (82%) create mode 100644 src/query/functions/scalar/time.go diff --git a/src/query/block/scalar.go b/src/query/block/scalar.go index 6bf6e3a3ed..df74b9d3ba 100644 --- a/src/query/block/scalar.go +++ b/src/query/block/scalar.go @@ -28,22 +28,22 @@ import ( ) // Scalar is a block containing a single value over a certain bound -// This represents constant values; it greatly simplifies downstream operations by -// allowing them to treat this as a regular block, while at the same time -// having an option to optimize by accessing the scalar value directly instead +// This represents constant values; it greatly simplifies downstream operations +// by allowing them to treat this as a regular block, while at the same time +// having an option to optimize by accessing the scalar value directly instead. type Scalar struct { - s ScalarFunc + val float64 meta Metadata } // NewScalar creates a scalar block containing val over the bounds func NewScalar( - s ScalarFunc, + val float64, bounds models.Bounds, tagOptions models.TagOptions, ) Block { return &Scalar{ - s: s, + val: val, meta: Metadata{ Bounds: bounds, Tags: models.NewTags(0, tagOptions), @@ -51,45 +51,40 @@ func NewScalar( } } -// Unconsolidated returns the unconsolidated version for the block func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) { - return nil, fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", b.meta) + return nil, + fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", + b.meta) } -// WithMetadata updates this blocks metadata, and the metadatas for each series. func (b *Scalar) WithMetadata( meta Metadata, _ []SeriesMeta, ) (Block, error) { return &Scalar{ meta: meta, - s: b.s, + val: b.val, }, nil } -// StepIter returns a StepIterator func (b *Scalar) StepIter() (StepIter, error) { bounds := b.meta.Bounds steps := bounds.Steps() return &scalarStepIter{ meta: b.meta, - s: b.s, + vals: []float64{b.val}, numVals: steps, idx: -1, }, nil } -// ScalarFunc determines the function to apply to generate the value at each step -type ScalarFunc func(t time.Time) float64 - -// SeriesIter returns a SeriesIterator func (b *Scalar) SeriesIter() (SeriesIter, error) { bounds := b.meta.Bounds steps := bounds.Steps() vals := make([]float64, steps) t := bounds.Start for i := range vals { - vals[i] = b.s(t) + vals[i] = b.val t = t.Add(bounds.StepSize) } @@ -100,12 +95,10 @@ func (b *Scalar) SeriesIter() (SeriesIter, error) { }, nil } -// Close closes the scalar block func (b *Scalar) Close() error { return nil } -// Value returns the value for the scalar block -func (b *Scalar) Value(t time.Time) float64 { - return b.s(t) +func (b *Scalar) Value() float64 { + return b.val } type scalarStepIter struct { @@ -113,10 +106,10 @@ type scalarStepIter struct { stepTime time.Time err error meta Metadata - s ScalarFunc + vals []float64 } -// build an empty SeriesMeta +// build an empty SeriesMetadata. func buildSeriesMeta(meta Metadata) SeriesMeta { return SeriesMeta{ Tags: models.NewTags(0, meta.Tags.Opts), @@ -153,7 +146,7 @@ func (it *scalarStepIter) Next() bool { func (it *scalarStepIter) Current() Step { t := it.stepTime return &scalarStep{ - vals: []float64{it.s(t)}, + vals: it.vals, time: t, } } diff --git a/src/query/block/scalar_test.go b/src/query/block/scalar_test.go index a81296cd4c..1af0b11145 100644 --- a/src/query/block/scalar_test.go +++ b/src/query/block/scalar_test.go @@ -42,7 +42,7 @@ var ( func TestScalarBlock(t *testing.T) { block := NewScalar( - func(_ time.Time) float64 { return val }, + val, bounds, models.NewTagOptions(), ) diff --git a/src/query/functions/binary/binary.go b/src/query/functions/binary/binary.go index e5bd3e7a55..37a4d7b4ef 100644 --- a/src/query/functions/binary/binary.go +++ b/src/query/functions/binary/binary.go @@ -21,8 +21,6 @@ package binary import ( - "time" - "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" @@ -53,8 +51,7 @@ func processBinary( return nil, errLeftScalar } - lVal := scalarL.Value(time.Time{}) - + lVal := scalarL.Value() // rhs is a series; use rhs metadata and series meta if !params.RIsScalar { return processSingleBlock( @@ -82,9 +79,7 @@ func processBinary( } return block.NewScalar( - func(t time.Time) float64 { - return fn(lVal, scalarR.Value(t)) - }, + fn(lVal, scalarR.Value()), lIter.Meta().Bounds, lIter.Meta().Tags.Opts, ), nil @@ -96,7 +91,7 @@ func processBinary( return nil, errRightScalar } - rVal := scalarR.Value(time.Time{}) + rVal := scalarR.Value() // lhs is a series; use lhs metadata and series meta. return processSingleBlock( queryCtx, diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 352e49a6dc..8a16e66772 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -119,22 +119,14 @@ func TestScalars(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.lVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.lVal, bounds, models.NewTagOptions()), ) require.NoError(t, err) err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.rVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.rVal, bounds, models.NewTagOptions()), ) expected := [][]float64{{ @@ -178,22 +170,14 @@ func TestScalarsReturnBoolFalse(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.lVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.lVal, bounds, models.NewTagOptions()), ) require.NoError(t, err) err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.rVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.rVal, bounds, models.NewTagOptions()), ) if tt.opType == EqType || tt.opType == NotEqType || @@ -584,11 +568,7 @@ func TestSingleSeriesReturnBool(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()), ) require.NoError(t, err) @@ -596,11 +576,7 @@ func TestSingleSeriesReturnBool(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()), ) require.NoError(t, err) @@ -656,11 +632,7 @@ func TestSingleSeriesReturnValues(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()), ) require.NoError(t, err) @@ -668,11 +640,7 @@ func TestSingleSeriesReturnValues(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - bounds, - models.NewTagOptions(), - ), + block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()), ) require.NoError(t, err) diff --git a/src/query/functions/scalar/base.go b/src/query/functions/scalar/scalar.go similarity index 74% rename from src/query/functions/scalar/base.go rename to src/query/functions/scalar/scalar.go index 71be6281a7..026ebf6978 100644 --- a/src/query/functions/scalar/base.go +++ b/src/query/functions/scalar/scalar.go @@ -21,8 +21,6 @@ package scalar import ( - "fmt" - "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" @@ -46,60 +44,52 @@ const ( TimeType = "time" ) -type baseOp struct { - fn block.ScalarFunc - tagOptions models.TagOptions - operatorType string +type scalarOp struct { + val float64 + tagOptions models.TagOptions } -func (o baseOp) OpType() string { - return o.operatorType +func (o scalarOp) OpType() string { + return ScalarType } -func (o baseOp) String() string { - return fmt.Sprintf("type: %s.", o.OpType()) +func (o scalarOp) String() string { + return "type: scalar" } -func (o baseOp) Node( +func (o scalarOp) Node( controller *transform.Controller, opts transform.Options, ) parser.Source { - return &baseNode{ + return &scalarNode{ op: o, controller: controller, opts: opts, } } -// NewScalarOp creates a new scalar op +// NewScalarOp creates an operation that yields a scalar source. func NewScalarOp( - fn block.ScalarFunc, - opType string, + val float64, tagOptions models.TagOptions, ) (parser.Params, error) { - if opType != ScalarType && opType != TimeType { - return nil, fmt.Errorf("unknown scalar type: %s", opType) - } - - return &baseOp{ - fn: fn, - tagOptions: tagOptions, - operatorType: opType, + return &scalarOp{ + val: val, + tagOptions: tagOptions, }, nil } -// scalarNode is the execution node -type baseNode struct { - op baseOp +// scalarNode is the execution node for time source. +type scalarNode struct { + op scalarOp controller *transform.Controller opts transform.Options } -// Execute runs the scalar node operation -func (n *baseNode) Execute(queryCtx *models.QueryContext) error { +// Execute runs the scalar source's pipeline. +func (n *scalarNode) Execute(queryCtx *models.QueryContext) error { bounds := n.opts.TimeSpec().Bounds() - - block := block.NewScalar(n.op.fn, bounds, n.op.tagOptions) + block := block.NewScalar(n.op.val, bounds, n.op.tagOptions) if n.opts.Debug() { // Ignore any errors iter, _ := block.StepIter() diff --git a/src/query/functions/scalar/base_test.go b/src/query/functions/scalar/scalar_test.go similarity index 82% rename from src/query/functions/scalar/base_test.go rename to src/query/functions/scalar/scalar_test.go index 368646351b..f355b2891d 100644 --- a/src/query/functions/scalar/base_test.go +++ b/src/query/functions/scalar/scalar_test.go @@ -22,7 +22,6 @@ package scalar import ( "testing" - "time" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" @@ -35,14 +34,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestScalarTime(t *testing.T) { +func TestScalar(t *testing.T) { + val := 10.0 _, bounds := test.GenerateValuesAndBounds(nil, nil) c, sink := executor.NewControllerWithSink(parser.NodeID(0)) - baseOp := baseOp{ - fn: func(t time.Time) float64 { return float64(t.Unix()) }, - operatorType: TimeType, - } + op, err := NewScalarOp(val, models.NewTagOptions()) + require.NoError(t, err) + baseOp, ok := op.(*scalarOp) + require.True(t, ok) start := bounds.Start step := bounds.StepSize node := baseOp.Node(c, transformtest.Options(t, transform.OptionsParams{ @@ -52,13 +52,14 @@ func TestScalarTime(t *testing.T) { Step: step, }, })) - err := node.Execute(models.NoopQueryContext()) + + err = node.Execute(models.NoopQueryContext()) require.NoError(t, err) - assert.Len(t, sink.Values, 1) + require.Equal(t, 1, len(sink.Values)) - for _, vals := range sink.Values { - for i, val := range vals { - assert.Equal(t, float64(start.Add(time.Duration(i)*step).Unix()), val) - } + vals := sink.Values[0] + assert.Equal(t, bounds.Steps(), len(vals)) + for _, v := range vals { + assert.Equal(t, val, v) } } diff --git a/src/query/functions/scalar/time.go b/src/query/functions/scalar/time.go new file mode 100644 index 0000000000..4368714ad8 --- /dev/null +++ b/src/query/functions/scalar/time.go @@ -0,0 +1,122 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scalar + +import ( + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/util/logging" + + "go.uber.org/zap" +) + +type timeOp struct { + tagOptions models.TagOptions +} + +func (o timeOp) OpType() string { + return TimeType +} + +func (o timeOp) String() string { + return "type: time" +} + +func (o timeOp) Node( + controller *transform.Controller, + opts transform.Options, +) parser.Source { + return &timeNode{ + controller: controller, + tagOptions: o.tagOptions, + opts: opts, + } +} + +// NewTimeOp creates an operation that yields a time-based source. +func NewTimeOp(tagOptions models.TagOptions) (parser.Params, error) { + return &timeOp{ + tagOptions: tagOptions, + }, nil +} + +// timeNode is the execution node for time source. +type timeNode struct { + tagOptions models.TagOptions + controller *transform.Controller + opts transform.Options +} + +// Execute runs the time source's pipeline. +func (n *timeNode) Execute(queryCtx *models.QueryContext) error { + bounds := n.opts.TimeSpec().Bounds() + meta := block.Metadata{ + Bounds: bounds, + Tags: models.NewTags(0, n.tagOptions), + } + + seriesMeta := []block.SeriesMeta{ + block.SeriesMeta{ + Tags: models.NewTags(0, n.tagOptions), + Name: []byte(TimeType), + }, + } + + builder := block.NewColumnBlockBuilder(queryCtx, meta, seriesMeta) + steps := bounds.Steps() + err := builder.AddCols(steps) + if err != nil { + return err + } + + for i := 0; i < steps; i++ { + t, err := bounds.TimeForIndex(i) + if err != nil { + return err + } + + timeVal := float64(t.Unix()) + if err := builder.AppendValue(i, timeVal); err != nil { + return err + } + } + + block := builder.Build() + if n.opts.Debug() { + // Ignore any errors + iter, _ := block.StepIter() + if iter != nil { + logging.WithContext(queryCtx.Ctx, n.opts.InstrumentOptions()). + Info("time node", zap.Any("meta", iter.Meta())) + } + } + + if err := n.controller.Process(queryCtx, block); err != nil { + block.Close() + // Fail on first error + return err + } + + block.Close() + return nil +} diff --git a/src/query/parser/promql/parse.go b/src/query/parser/promql/parse.go index 358523e8ff..6bc9f9e28a 100644 --- a/src/query/parser/promql/parse.go +++ b/src/query/parser/promql/parse.go @@ -213,12 +213,7 @@ func (p *parseState) walk(node pql.Node) error { return err } - op, err := scalar.NewScalarOp( - func(_ time.Time) float64 { return val }, - scalar.ScalarType, - p.tagOpts, - ) - + op, err := scalar.NewScalarOp(val, p.tagOpts) if err != nil { return err } diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index dc3c750f20..12cff7bed7 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -22,7 +22,6 @@ package promql import ( "fmt" - "time" "github.com/m3db/m3/src/query/functions" "github.com/m3db/m3/src/query/functions/aggregation" @@ -145,11 +144,7 @@ func newScalarOperator( expr *promql.NumberLiteral, tagOpts models.TagOptions, ) (parser.Params, error) { - return scalar.NewScalarOp( - func(_ time.Time) float64 { return expr.Val }, - scalar.ScalarType, - tagOpts, - ) + return scalar.NewScalarOp(expr.Val, tagOpts) } // NewBinaryOperator creates a new binary operator based on the type. @@ -262,12 +257,7 @@ func NewFunctionExpr( return p, true, err case scalar.TimeType: - p, err = scalar.NewScalarOp( - func(t time.Time) float64 { return float64(t.Unix()) }, - scalar.TimeType, - tagOptions, - ) - + p, err = scalar.NewTimeOp(tagOptions) return p, true, err // NB: no-ops. From cce963a76bdfc8b55f15f12d2bba6f55560a9d06 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 15 Aug 2019 19:29:53 -0400 Subject: [PATCH 2/5] Adding test separately so git tracks correctly --- src/query/functions/scalar/time_test.go | 63 +++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/query/functions/scalar/time_test.go diff --git a/src/query/functions/scalar/time_test.go b/src/query/functions/scalar/time_test.go new file mode 100644 index 0000000000..400e10249d --- /dev/null +++ b/src/query/functions/scalar/time_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scalar + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/test/executor" + "github.com/m3db/m3/src/query/test/transformtest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTime(t *testing.T) { + _, bounds := test.GenerateValuesAndBounds(nil, nil) + c, sink := executor.NewControllerWithSink(parser.NodeID(0)) + op, err := NewTimeOp(models.NewTagOptions()) + require.NoError(t, err) + + baseOp, ok := op.(*timeOp) + require.True(t, ok) + start := bounds.Start + step := bounds.StepSize + node := baseOp.Node(c, transformtest.Options(t, transform.OptionsParams{ + TimeSpec: transform.TimeSpec{ + Start: start, + End: bounds.End(), + Step: step, + }, + })) + + err = node.Execute(models.NoopQueryContext()) + require.NoError(t, err) + assert.Len(t, sink.Values, 1) + + for i, vals := range sink.Values { + assert.Equal(t, float64(start.Add(time.Duration(i)*step).Unix()), vals[0]) + } +} From 657a7317dda600d93fb2730f7b0afab69cd2a266 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 15 Aug 2019 21:26:22 -0400 Subject: [PATCH 3/5] Add special scalar detection for time() function --- src/query/functions/binary/base.go | 2 +- src/query/parser/promql/types.go | 40 ++++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index 9d600d9920..5bc264e2ad 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -84,7 +84,7 @@ func ArithmeticFunction(opType string, returnBool bool) (Function, error) { return fn, nil } - return nil, errNoMatching + return nil, fmt.Errorf("no arithmetic function found for type: %s", opType) } // NewOp creates a new binary operation. diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index 12cff7bed7..93b8a4223a 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -147,16 +147,26 @@ func newScalarOperator( return scalar.NewScalarOp(expr.Val, tagOpts) } +func isTime(expr promql.Expr) bool { + return expr.String() == "time()" +} + // NewBinaryOperator creates a new binary operator based on the type. func NewBinaryOperator(expr *promql.BinaryExpr, lhs, rhs parser.NodeID) (parser.Params, error) { matching := promMatchingToM3(expr.VectorMatching) + // NB: Prometheus handles `time()` in a way that's not entirely compatible + // with the query engine; special handling is required here. + lTime, rTime := isTime(expr.LHS), isTime(expr.RHS) + if matching == nil { + matching = timeMatcher(lTime, rTime) + } nodeParams := binary.NodeParams{ LNode: lhs, RNode: rhs, - LIsScalar: expr.LHS.Type() == promql.ValueTypeScalar, - RIsScalar: expr.RHS.Type() == promql.ValueTypeScalar, + LIsScalar: expr.LHS.Type() == promql.ValueTypeScalar && !lTime, + RIsScalar: expr.RHS.Type() == promql.ValueTypeScalar && !rTime, ReturnBool: expr.ReturnBool, VectorMatching: matching, } @@ -414,3 +424,29 @@ func promMatchingToM3( Include: vectorMatching.Include, } } + +// Iff one of left or right is a time() function, match match one to many +// against it, and match everything. +func timeMatcher(left, right bool) *binary.VectorMatching { + if left { + if right { + return &binary.VectorMatching{ + Card: binary.CardOneToOne, + } + } + + return &binary.VectorMatching{ + Card: binary.CardOneToMany, + On: true, + } + } + + if right { + return &binary.VectorMatching{ + Card: binary.CardManyToOne, + On: true, + } + } + + return nil +} From a0d20af54cbd0270250f795f407e00114e5008a5 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 22 Aug 2019 14:45:30 -0400 Subject: [PATCH 4/5] Allow blocks to expose type information --- src/query/block/accounted.go | 11 ++- src/query/block/accounted_test.go | 7 +- src/query/block/block_mock.go | 26 ++++++ src/query/block/column.go | 11 +++ src/query/block/column_test.go | 46 ++++++++++ src/query/block/container.go | 4 + src/query/block/information.go | 86 +++++++++++++++++++ src/query/block/lazy.go | 4 + src/query/block/lazy_test.go | 5 ++ src/query/block/scalar.go | 4 + src/query/block/scalar_test.go | 1 + src/query/block/types.go | 16 ++++ src/query/functions/binary/and.go | 2 +- src/query/functions/binary/and_test.go | 23 +++-- src/query/functions/binary/arithmetic.go | 2 +- src/query/functions/binary/base.go | 14 ++- src/query/functions/binary/binary.go | 20 ++--- src/query/functions/binary/binary_test.go | 58 +++++-------- src/query/functions/binary/common.go | 63 +++++++------- src/query/functions/binary/comparison.go | 2 +- src/query/functions/binary/logical.go | 5 +- src/query/functions/binary/or.go | 2 +- src/query/functions/binary/or_test.go | 30 +++---- src/query/functions/binary/types.go | 67 +++++++++++++++ src/query/functions/binary/unless.go | 2 +- src/query/functions/binary/unless_test.go | 6 +- src/query/functions/scalar/time.go | 1 + src/query/functions/scalar/time_test.go | 3 + src/query/parser/promql/types.go | 61 +++---------- src/query/storage/block.go | 5 ++ src/query/storage/consolidated.go | 4 + src/query/storage/consolidated_test.go | 7 +- src/query/test/executor/transform.go | 2 + src/query/ts/m3db/encoded_block.go | 4 + .../ts/m3db/encoded_step_iterator_test.go | 6 +- 35 files changed, 433 insertions(+), 177 deletions(-) create mode 100644 src/query/block/column_test.go create mode 100644 src/query/block/information.go create mode 100644 src/query/functions/binary/types.go diff --git a/src/query/block/accounted.go b/src/query/block/accounted.go index 3538b7812c..3dc2b45785 100644 --- a/src/query/block/accounted.go +++ b/src/query/block/accounted.go @@ -22,7 +22,8 @@ package block import "github.com/m3db/m3/src/query/cost" -// AccountedBlock is a wrapper for a block which enforces limits on the number of datapoints used by the block. +// AccountedBlock is a wrapper for a block which enforces limits on the number +// of datapoints used by the block. type AccountedBlock struct { Block @@ -30,14 +31,18 @@ type AccountedBlock struct { } // NewAccountedBlock wraps the given block and enforces datapoint limits. -func NewAccountedBlock(wrapped Block, enforcer cost.ChainedEnforcer) *AccountedBlock { +func NewAccountedBlock( + wrapped Block, + enforcer cost.ChainedEnforcer, +) *AccountedBlock { return &AccountedBlock{ Block: wrapped, enforcer: enforcer, } } -// Close closes the block, and marks the number of datapoints used by this block as finished. +// Close closes the block, and marks the number of datapoints used +// by this block as finished. func (ab *AccountedBlock) Close() error { ab.enforcer.Close() return ab.Block.Close() diff --git a/src/query/block/accounted_test.go b/src/query/block/accounted_test.go index 8172e341a9..88a2518fca 100644 --- a/src/query/block/accounted_test.go +++ b/src/query/block/accounted_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/cost" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" ) func TestAccountedBlock_Close(t *testing.T) { @@ -37,5 +38,9 @@ func TestAccountedBlock_Close(t *testing.T) { mockEnforcer := cost.NewMockChainedEnforcer(ctrl) mockEnforcer.EXPECT().Close() - NewAccountedBlock(wrapped, mockEnforcer).Close() + block := NewAccountedBlock(wrapped, mockEnforcer) + + wrapped.EXPECT().Info().Return(NewBlockInformation(BlockM3TSZCompressed)) + assert.Equal(t, BlockM3TSZCompressed, block.Info().Type()) + assert.NotPanics(t, func() { block.Close() }) } diff --git a/src/query/block/block_mock.go b/src/query/block/block_mock.go index 73831d5296..2b92d17d51 100644 --- a/src/query/block/block_mock.go +++ b/src/query/block/block_mock.go @@ -70,6 +70,20 @@ func (mr *MockBlockMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockBlock)(nil).Close)) } +// Info mocks base method +func (m *MockBlock) Info() BlockInformation { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Info") + ret0, _ := ret[0].(BlockInformation) + return ret0 +} + +// Info indicates an expected call of Info +func (mr *MockBlockMockRecorder) Info() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockBlock)(nil).Info)) +} + // SeriesIter mocks base method func (m *MockBlock) SeriesIter() (SeriesIter, error) { m.ctrl.T.Helper() @@ -447,6 +461,18 @@ func (mr *MockBuilderMockRecorder) Build() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockBuilder)(nil).Build)) } +// SetBlockType mocks base method +func (m *MockBuilder) SetBlockType(arg0 BlockType) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetBlockType", arg0) +} + +// SetBlockType indicates an expected call of SetBlockType +func (mr *MockBuilderMockRecorder) SetBlockType(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetBlockType", reflect.TypeOf((*MockBuilder)(nil).SetBlockType), arg0) +} + // MockStep is a mock of Step interface type MockStep struct { ctrl *gomock.Controller diff --git a/src/query/block/column.go b/src/query/block/column.go index 6e171e1631..74c86ee306 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -39,6 +39,7 @@ type ColumnBlockBuilder struct { } type columnBlock struct { + blockType BlockType columns []column meta Metadata seriesMeta []SeriesMeta @@ -78,6 +79,7 @@ func (c *columnBlock) WithMetadata( columns: c.columns, meta: meta, seriesMeta: seriesMetas, + blockType: BlockDecompressed, }, nil } @@ -90,6 +92,10 @@ func (c *columnBlock) StepCount() int { return len(c.columns) } +func (c *columnBlock) Info() BlockInformation { + return NewBlockInformation(c.blockType) +} + // Close frees up any resources // TODO: actually free up the resources func (c *columnBlock) Close() error { @@ -182,10 +188,15 @@ func NewColumnBlockBuilder( block: &columnBlock{ meta: meta, seriesMeta: seriesMeta, + blockType: BlockDecompressed, }, } } +func (cb ColumnBlockBuilder) SetBlockType(blockType BlockType) { + cb.block.blockType = blockType +} + // AppendValue adds a value to a column at index func (cb ColumnBlockBuilder) AppendValue(idx int, value float64) error { columns := cb.block.columns diff --git a/src/query/block/column_test.go b/src/query/block/column_test.go new file mode 100644 index 0000000000..bff732ef48 --- /dev/null +++ b/src/query/block/column_test.go @@ -0,0 +1,46 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +import ( + "context" + "testing" + + "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/query/models" + "github.com/stretchr/testify/assert" + + "github.com/uber-go/tally" +) + +func TestColumnBuilderInfoTypes(t *testing.T) { + ctx := models.NewQueryContext(context.Background(), + tally.NoopScope, cost.NoopChainedEnforcer(), + models.QueryContextOptions{}) + + builder := NewColumnBlockBuilder(ctx, Metadata{}, []SeriesMeta{}) + block := builder.Build() + assert.Equal(t, BlockDecompressed, block.Info().blockType) + + block = builder.Build() + builder.SetBlockType(BlockScalar) + assert.Equal(t, BlockScalar, block.Info().blockType) +} diff --git a/src/query/block/container.go b/src/query/block/container.go index 290b6854ba..b8bff6a5ae 100644 --- a/src/query/block/container.go +++ b/src/query/block/container.go @@ -58,6 +58,10 @@ func (b *containerBlock) AddBlock(bl Block) error { return nil } +func (c *containerBlock) Info() BlockInformation { + return NewBlockInformation(BlockContainer) +} + func (b *containerBlock) Close() error { multiErr := xerrors.NewMultiError() multiErr = multiErr.Add(b.err) diff --git a/src/query/block/information.go b/src/query/block/information.go new file mode 100644 index 0000000000..151bf72417 --- /dev/null +++ b/src/query/block/information.go @@ -0,0 +1,86 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +func (t BlockType) String() string { + switch t { + case BlockM3TSZCompressed: + return "M3TSZ_compressed" + case BlockDecompressed: + return "decompressed" + case BlockScalar: + return "scalar" + case BlockLazy: + return "lazy" + case BlockTime: + return "time" + case BlockContainer: + return "container" + case BlockWrapper: + return "wrapper" + case BlockConsolidated: + return "consolidated" + } + + return "unknown" +} + +type BlockInformation struct { + blockType BlockType + inner []BlockType +} + +func NewBlockInformation(blockType BlockType) BlockInformation { + return BlockInformation{blockType: blockType} +} + +func NewWrappedBlockInformation( + blockType BlockType, + wrap BlockInformation, +) BlockInformation { + inner := make([]BlockType, len(wrap.inner)+1) + copy(inner[:1], wrap.inner) + inner[0] = wrap.blockType + return BlockInformation{ + blockType: blockType, + inner: inner, + } +} + +func (b BlockInformation) Type() BlockType { + return b.blockType +} + +func (b BlockInformation) InnerType() BlockType { + if b.inner == nil { + return b.Type() + } + + return b.inner[0] +} + +func (b BlockInformation) BaseType() BlockType { + if b.inner == nil { + return b.Type() + } + + return b.inner[len(b.inner)-1] +} diff --git a/src/query/block/lazy.go b/src/query/block/lazy.go index 0797cd7be4..e860fe2277 100644 --- a/src/query/block/lazy.go +++ b/src/query/block/lazy.go @@ -39,6 +39,10 @@ func NewLazyBlock(block Block, opts LazyOptions) Block { } } +func (c *lazyBlock) Info() BlockInformation { + return NewWrappedBlockInformation(BlockLazy, c.block.Info()) +} + func (b *lazyBlock) Close() error { return b.block.Close() } func (b *lazyBlock) WithMetadata( diff --git a/src/query/block/lazy_test.go b/src/query/block/lazy_test.go index 0701c23c7b..0d0ae23268 100644 --- a/src/query/block/lazy_test.go +++ b/src/query/block/lazy_test.go @@ -78,6 +78,11 @@ func TestValidOffset(t *testing.T) { offset := time.Minute off := NewLazyBlock(b, testLazyOpts(offset, 1.0)) + b.EXPECT().Info().Return(NewBlockInformation(BlockM3TSZCompressed)) + info := off.Info() + assert.Equal(t, BlockLazy, info.Type()) + assert.Equal(t, BlockM3TSZCompressed, info.BaseType()) + // ensure functions are marshalled to the underlying block. b.EXPECT().Close().Return(nil) err := off.Close() diff --git a/src/query/block/scalar.go b/src/query/block/scalar.go index df74b9d3ba..264b8a9004 100644 --- a/src/query/block/scalar.go +++ b/src/query/block/scalar.go @@ -51,6 +51,10 @@ func NewScalar( } } +func (c *Scalar) Info() BlockInformation { + return NewBlockInformation(BlockScalar) +} + func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) { return nil, fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", diff --git a/src/query/block/scalar_test.go b/src/query/block/scalar_test.go index 1af0b11145..37c5be8d96 100644 --- a/src/query/block/scalar_test.go +++ b/src/query/block/scalar_test.go @@ -47,6 +47,7 @@ func TestScalarBlock(t *testing.T) { models.NewTagOptions(), ) + assert.Equal(t, BlockScalar, block.Info().Type()) require.IsType(t, block, &Scalar{}) stepIter, err := block.StepIter() require.NoError(t, err) diff --git a/src/query/block/types.go b/src/query/block/types.go index f536648d58..d7a1ae160a 100644 --- a/src/query/block/types.go +++ b/src/query/block/types.go @@ -30,6 +30,19 @@ import ( "github.com/m3db/m3/src/query/ts" ) +type BlockType uint8 + +const ( + BlockM3TSZCompressed BlockType = iota + BlockDecompressed + BlockScalar + BlockLazy + BlockTime + BlockContainer + BlockWrapper + BlockConsolidated +) + // Block represents a group of series across a time bound. type Block interface { io.Closer @@ -43,6 +56,8 @@ type Block interface { SeriesIter() (SeriesIter, error) // WithMetadata returns a block with updated meta and series metadata. WithMetadata(Metadata, []SeriesMeta) (Block, error) + // Info returns information about the block. + Info() BlockInformation } type AccumulatorBlock interface { @@ -167,6 +182,7 @@ func (m Metadata) String() string { type Builder interface { AppendValue(idx int, value float64) error AppendValues(idx int, values []float64) error + SetBlockType(blockType BlockType) Build() Block AddCols(num int) error } diff --git a/src/query/functions/binary/and.go b/src/query/functions/binary/and.go index c2b77c2eef..320b756698 100644 --- a/src/query/functions/binary/and.go +++ b/src/query/functions/binary/and.go @@ -103,7 +103,7 @@ func andIntersect( matching *VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]indexMatcher, []block.SeriesMeta) { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the right-hand side. rightSigs := make(map[uint64]int, len(rhs)) for idx, meta := range rhs { diff --git a/src/query/functions/binary/and_test.go b/src/query/functions/binary/and_test.go index 702d78d8f6..6d397b7cb1 100644 --- a/src/query/functions/binary/and_test.go +++ b/src/query/functions/binary/and_test.go @@ -36,6 +36,11 @@ import ( "github.com/stretchr/testify/require" ) +func nilVectorMatcherBuilder(_, _ block.Block) *VectorMatching { return nil } +func emptyVectorMatcherBuilder(_, _ block.Block) *VectorMatching { + return &VectorMatching{} +} + func TestAndWithExactValues(t *testing.T) { values, bounds := test.GenerateValuesAndBounds(nil, nil) block1 := test.NewBlockFromValues(bounds, values) @@ -44,9 +49,9 @@ func TestAndWithExactValues(t *testing.T) { op, err := NewOp( AndType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -76,9 +81,9 @@ func TestAndWithSomeValues(t *testing.T) { op, err := NewOp( AndType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -196,9 +201,9 @@ func TestAnd(t *testing.T) { op, err := NewOp( AndType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/binary/arithmetic.go b/src/query/functions/binary/arithmetic.go index cfd4cb7f0d..53ece86588 100644 --- a/src/query/functions/binary/arithmetic.go +++ b/src/query/functions/binary/arithmetic.go @@ -60,7 +60,7 @@ const ( ) var ( - arithmeticFuncs = map[string]Function{ + arithmeticFuncs = map[string]binaryFunction{ PlusType: func(x, y float64) float64 { return x + y }, MinusType: func(x, y float64) float64 { return x - y }, MultiplyType: func(x, y float64) float64 { return x * y }, diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index 5bc264e2ad..eb79a24fed 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -37,14 +37,6 @@ type baseOp struct { params NodeParams } -// NodeParams describes the types of nodes used for binary operations. -type NodeParams struct { - LNode, RNode parser.NodeID - LIsScalar, RIsScalar bool - ReturnBool bool - VectorMatching *VectorMatching -} - // OpType for the operator. func (o baseOp) OpType() string { return o.OperatorType @@ -69,7 +61,7 @@ func (o baseOp) Node( } // ArithmeticFunction returns the arithmetic function for this operation type. -func ArithmeticFunction(opType string, returnBool bool) (Function, error) { +func ArithmeticFunction(opType string, returnBool bool) (binaryFunction, error) { if fn, ok := arithmeticFuncs[opType]; ok { return fn, nil } @@ -92,6 +84,10 @@ func NewOp( opType string, params NodeParams, ) (parser.Params, error) { + if params.VectorMatcherBuilder == nil { + params.VectorMatcherBuilder = defaultVectorMatcherBuilder + } + fn, ok := buildLogicalFunction(opType, params) if !ok { fn, ok = buildArithmeticFunction(opType, params) diff --git a/src/query/functions/binary/binary.go b/src/query/functions/binary/binary.go index 37a4d7b4ef..5226bcd984 100644 --- a/src/query/functions/binary/binary.go +++ b/src/query/functions/binary/binary.go @@ -27,8 +27,7 @@ import ( "github.com/m3db/m3/src/query/models" ) -// Function is a function that applies on two floats. -type Function func(x, y float64) float64 +type binaryFunction func(x, y float64) float64 type singleScalarFunc func(x float64) float64 // processes two logical blocks, performing a logical operation on them. @@ -38,14 +37,14 @@ func processBinary( params NodeParams, controller *transform.Controller, isComparison bool, - fn Function, + fn binaryFunction, ) (block.Block, error) { lIter, err := lhs.StepIter() if err != nil { return nil, err } - if params.LIsScalar { + if lhs.Info().Type() == block.BlockScalar { scalarL, ok := lhs.(*block.Scalar) if !ok { return nil, errLeftScalar @@ -53,7 +52,7 @@ func processBinary( lVal := scalarL.Value() // rhs is a series; use rhs metadata and series meta - if !params.RIsScalar { + if rhs.Info().Type() != block.BlockScalar { return processSingleBlock( queryCtx, rhs, @@ -85,7 +84,7 @@ func processBinary( ), nil } - if params.RIsScalar { + if rhs.Info().Type() == block.BlockScalar { scalarR, ok := rhs.(*block.Scalar) if !ok { return nil, errRightScalar @@ -109,14 +108,15 @@ func processBinary( return nil, err } + matcher := params.VectorMatcherBuilder(lhs, rhs) // NB(arnikola): this is a sanity check, as functions between // two series missing vector matching should have previously // errored out during the parsing step. - if params.VectorMatching == nil { + if matcher == nil { return nil, errNoMatching } - return processBothSeries(queryCtx, lIter, rIter, controller, params.VectorMatching, fn) + return processBothSeries(queryCtx, lIter, rIter, controller, matcher, fn) } func processSingleBlock( @@ -163,7 +163,7 @@ func processBothSeries( lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, - fn Function, + fn binaryFunction, ) (block.Block, error) { if lIter.StepCount() != rIter.StepCount() { return nil, errMismatchedStepCounts @@ -226,7 +226,7 @@ func intersect( matching *VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]int, []int, []block.SeriesMeta) { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the right-hand side. rightSigs := make(map[uint64]int, len(rhs)) for idx, meta := range rhs { diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 8a16e66772..d9ed6de75b 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -103,12 +103,10 @@ func TestScalars(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: true, - RIsScalar: true, - ReturnBool: true, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: true, + VectorMatcherBuilder: nilVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -154,12 +152,10 @@ func TestScalarsReturnBoolFalse(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: true, - RIsScalar: true, - ReturnBool: false, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: false, + VectorMatcherBuilder: nilVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -538,12 +534,10 @@ func TestSingleSeriesReturnBool(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: !tt.seriesLeft, - RIsScalar: tt.seriesLeft, - ReturnBool: true, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: true, + VectorMatcherBuilder: nilVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -602,12 +596,10 @@ func TestSingleSeriesReturnValues(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: !tt.seriesLeft, - RIsScalar: tt.seriesLeft, - ReturnBool: false, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: false, + VectorMatcherBuilder: nilVectorMatcherBuilder, }, ) @@ -879,12 +871,10 @@ func TestBothSeries(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: false, - RIsScalar: false, - ReturnBool: tt.returnBool, - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: tt.returnBool, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -956,11 +946,9 @@ func TestBinaryFunctionWithDifferentNames(t *testing.T) { op, err := NewOp( PlusType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: false, - RIsScalar: false, - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/binary/common.go b/src/query/functions/binary/common.go index 7794539cea..43aa6bbaaa 100644 --- a/src/query/functions/binary/common.go +++ b/src/query/functions/binary/common.go @@ -38,40 +38,9 @@ type indexMatcher struct { rhsIndex int } -// VectorMatchCardinality describes the cardinality relationship -// of two Vectors in a binary operation. -type VectorMatchCardinality int - -const ( - // CardOneToOne is used for one-one relationship - CardOneToOne VectorMatchCardinality = iota - // CardManyToOne is used for many-one relationship - CardManyToOne - // CardOneToMany is used for one-many relationship - CardOneToMany - // CardManyToMany is used for many-many relationship - CardManyToMany -) - -// VectorMatching describes how elements from two Vectors in a binary -// operation are supposed to be matched. -type VectorMatching struct { - // The cardinality of the two Vectors. - Card VectorMatchCardinality - // MatchingLabels contains the labels which define equality of a pair of - // elements from the Vectors. - MatchingLabels [][]byte - // On includes the given label names from matching, - // rather than excluding them. - On bool - // Include contains additional labels that should be included in - // the result from the side with the lower cardinality. - Include []string -} - -// HashFunc returns a function that calculates the signature for a metric +// hashFunc returns a function that calculates the signature for a metric // ignoring the provided labels. If on, then only the given labels are used. -func HashFunc(on bool, names ...[]byte) func(models.Tags) uint64 { +func hashFunc(on bool, names ...[]byte) func(models.Tags) uint64 { if on { return func(tags models.Tags) uint64 { return tags.TagsWithKeys(names).HashedID() @@ -107,6 +76,34 @@ func tagMap(t models.Tags) map[string]models.Tag { return m } +// Iff one of left or right is a time block, match match one to many +// against it, and match everything. +func defaultVectorMatcherBuilder(lhs, rhs block.Block) *VectorMatching { + left := lhs.Info().BaseType() == block.BlockTime + right := rhs.Info().BaseType() == block.BlockTime + if left { + if right { + return &VectorMatching{ + Card: CardOneToOne, + } + } + + return &VectorMatching{ + Card: CardOneToMany, + On: true, + } + } + + if right { + return &VectorMatching{ + Card: CardManyToOne, + On: true, + } + } + + return nil +} + func combineMetaAndSeriesMeta( meta, otherMeta block.Metadata, seriesMeta, otherSeriesMeta []block.SeriesMeta, diff --git a/src/query/functions/binary/comparison.go b/src/query/functions/binary/comparison.go index 7f9ad17bc9..101d857d87 100644 --- a/src/query/functions/binary/comparison.go +++ b/src/query/functions/binary/comparison.go @@ -68,7 +68,7 @@ func toComparisonValue(b bool, x float64) float64 { } var ( - comparisonFuncs = map[string]Function{ + comparisonFuncs = map[string]binaryFunction{ EqType: func(x, y float64) float64 { return toComparisonValue(x == y, x) }, NotEqType: func(x, y float64) float64 { return toComparisonValue(x != y, x) }, GreaterType: func(x, y float64) float64 { return toComparisonValue(x > y, x) }, diff --git a/src/query/functions/binary/logical.go b/src/query/functions/binary/logical.go index 33e33e39b5..293c791683 100644 --- a/src/query/functions/binary/logical.go +++ b/src/query/functions/binary/logical.go @@ -61,7 +61,7 @@ func createLogicalProcessingStep( return func(queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { return processLogical(queryCtx, lhs, rhs, controller, - params.VectorMatching, fn) + params.VectorMatcherBuilder, fn) } } @@ -69,7 +69,7 @@ func processLogical( queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller, - matching *VectorMatching, + matcherBuilder VectorMatcherBuilder, makeBlock makeBlockFn, ) (block.Block, error) { lIter, err := lhs.StepIter() @@ -86,5 +86,6 @@ func processLogical( return nil, errMismatchedStepCounts } + matching := matcherBuilder(lhs, rhs) return makeBlock(queryCtx, lIter, rIter, controller, matching) } diff --git a/src/query/functions/binary/or.go b/src/query/functions/binary/or.go index d50d9bbeee..e45abb4069 100644 --- a/src/query/functions/binary/or.go +++ b/src/query/functions/binary/or.go @@ -118,7 +118,7 @@ func mergeIndices( matching *VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]int, []block.SeriesMeta) { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the left-hand side. leftSigs := make(map[uint64]int, len(lhs)) for i, meta := range lhs { diff --git a/src/query/functions/binary/or_test.go b/src/query/functions/binary/or_test.go index 544e5df2d7..f08f3930af 100644 --- a/src/query/functions/binary/or_test.go +++ b/src/query/functions/binary/or_test.go @@ -45,9 +45,9 @@ func TestOrWithExactValues(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -76,9 +76,9 @@ func TestOrWithSomeValues(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -263,9 +263,9 @@ func TestOrs(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -313,9 +313,9 @@ func TestOrsBoundsError(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -353,9 +353,9 @@ func TestOrCombinedMetadata(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/binary/types.go b/src/query/functions/binary/types.go new file mode 100644 index 0000000000..7847bb6191 --- /dev/null +++ b/src/query/functions/binary/types.go @@ -0,0 +1,67 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package binary + +import ( + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/parser" +) + +// VectorMatchCardinality describes the cardinality relationship +// of two Vectors in a binary operation. +type VectorMatchCardinality int + +const ( + // CardOneToOne is used for one-one relationship + CardOneToOne VectorMatchCardinality = iota + // CardManyToOne is used for many-one relationship + CardManyToOne + // CardOneToMany is used for one-many relationship + CardOneToMany + // CardManyToMany is used for many-many relationship + CardManyToMany +) + +// VectorMatching describes how elements from two Vectors in a binary +// operation are supposed to be matched. +type VectorMatching struct { + // The cardinality of the two Vectors. + Card VectorMatchCardinality + // MatchingLabels contains the labels which define equality of a pair of + // elements from the Vectors. + MatchingLabels [][]byte + // On includes the given label names from matching, + // rather than excluding them. + On bool + // Include contains additional labels that should be included in + // the result from the side with the lower cardinality. + Include []string +} + +// VectorMatcherBuilder creates a vector matcher based on incoming block types. +type VectorMatcherBuilder func(lhs, rhs block.Block) *VectorMatching + +// NodeParams describes the types of nodes used for binary operations. +type NodeParams struct { + LNode, RNode parser.NodeID + ReturnBool bool + VectorMatcherBuilder VectorMatcherBuilder +} diff --git a/src/query/functions/binary/unless.go b/src/query/functions/binary/unless.go index 35b4aa6229..99cecf427b 100644 --- a/src/query/functions/binary/unless.go +++ b/src/query/functions/binary/unless.go @@ -103,7 +103,7 @@ func matchingIndices( matching *VectorMatching, lhs, rhs []block.SeriesMeta, ) []indexMatcher { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the left-hand side. leftSigs := make(map[uint64]int, len(lhs)) for idx, meta := range lhs { diff --git a/src/query/functions/binary/unless_test.go b/src/query/functions/binary/unless_test.go index f5fd8d366a..f2256316fe 100644 --- a/src/query/functions/binary/unless_test.go +++ b/src/query/functions/binary/unless_test.go @@ -213,9 +213,9 @@ func TestUnless(t *testing.T) { op, err := NewOp( UnlessType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/scalar/time.go b/src/query/functions/scalar/time.go index 4368714ad8..6d57eff1e7 100644 --- a/src/query/functions/scalar/time.go +++ b/src/query/functions/scalar/time.go @@ -101,6 +101,7 @@ func (n *timeNode) Execute(queryCtx *models.QueryContext) error { } } + builder.SetBlockType(block.BlockTime) block := builder.Build() if n.opts.Debug() { // Ignore any errors diff --git a/src/query/functions/scalar/time_test.go b/src/query/functions/scalar/time_test.go index 400e10249d..930db7909f 100644 --- a/src/query/functions/scalar/time_test.go +++ b/src/query/functions/scalar/time_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" @@ -56,6 +58,7 @@ func TestTime(t *testing.T) { err = node.Execute(models.NoopQueryContext()) require.NoError(t, err) assert.Len(t, sink.Values, 1) + assert.Equal(t, block.BlockTime, sink.Info.Type()) for i, vals := range sink.Values { assert.Equal(t, float64(start.Add(time.Duration(i)*step).Unix()), vals[0]) diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index 93b8a4223a..7e2ddf22b9 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -23,6 +23,8 @@ package promql import ( "fmt" + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/functions" "github.com/m3db/m3/src/query/functions/aggregation" "github.com/m3db/m3/src/query/functions/binary" @@ -147,28 +149,15 @@ func newScalarOperator( return scalar.NewScalarOp(expr.Val, tagOpts) } -func isTime(expr promql.Expr) bool { - return expr.String() == "time()" -} - // NewBinaryOperator creates a new binary operator based on the type. func NewBinaryOperator(expr *promql.BinaryExpr, lhs, rhs parser.NodeID) (parser.Params, error) { - matching := promMatchingToM3(expr.VectorMatching) - // NB: Prometheus handles `time()` in a way that's not entirely compatible - // with the query engine; special handling is required here. - lTime, rTime := isTime(expr.LHS), isTime(expr.RHS) - if matching == nil { - matching = timeMatcher(lTime, rTime) - } - + matcherBuilder := promMatchingToM3(expr.VectorMatching) nodeParams := binary.NodeParams{ - LNode: lhs, - RNode: rhs, - LIsScalar: expr.LHS.Type() == promql.ValueTypeScalar && !lTime, - RIsScalar: expr.RHS.Type() == promql.ValueTypeScalar && !rTime, - ReturnBool: expr.ReturnBool, - VectorMatching: matching, + LNode: lhs, + RNode: rhs, + ReturnBool: expr.ReturnBool, + VectorMatcherBuilder: matcherBuilder, } op := getBinaryOpType(expr.Op) @@ -406,7 +395,7 @@ func promVectorCardinalityToM3( func promMatchingToM3( vectorMatching *promql.VectorMatching, -) *binary.VectorMatching { +) binary.VectorMatcherBuilder { // vectorMatching can be nil iff at least one of the sides is a scalar. if vectorMatching == nil { return nil @@ -417,36 +406,12 @@ func promMatchingToM3( byteMatchers[i] = []byte(label) } - return &binary.VectorMatching{ - Card: promVectorCardinalityToM3(vectorMatching.Card), - MatchingLabels: byteMatchers, - On: vectorMatching.On, - Include: vectorMatching.Include, - } -} - -// Iff one of left or right is a time() function, match match one to many -// against it, and match everything. -func timeMatcher(left, right bool) *binary.VectorMatching { - if left { - if right { - return &binary.VectorMatching{ - Card: binary.CardOneToOne, - } - } - + return func(_, _ block.Block) *binary.VectorMatching { return &binary.VectorMatching{ - Card: binary.CardOneToMany, - On: true, + Card: promVectorCardinalityToM3(vectorMatching.Card), + MatchingLabels: byteMatchers, + On: vectorMatching.On, + Include: vectorMatching.Include, } } - - if right { - return &binary.VectorMatching{ - Card: binary.CardManyToOne, - On: true, - } - } - - return nil } diff --git a/src/query/storage/block.go b/src/query/storage/block.go index 6c713d3df6..213fa48655 100644 --- a/src/query/storage/block.go +++ b/src/query/storage/block.go @@ -64,6 +64,11 @@ type multiBlockWrapper struct { unconsolidated block.UnconsolidatedBlock } +func (m *multiBlockWrapper) Info() block.BlockInformation { + return block.NewWrappedBlockInformation(block.BlockWrapper, + m.consolidated.Info()) +} + func (m *multiBlockWrapper) Unconsolidated() (block.UnconsolidatedBlock, error) { return m.unconsolidated, nil } diff --git a/src/query/storage/consolidated.go b/src/query/storage/consolidated.go index 196a4e0379..c8969ff272 100644 --- a/src/query/storage/consolidated.go +++ b/src/query/storage/consolidated.go @@ -31,6 +31,10 @@ type consolidatedBlock struct { consolidationFunc block.ConsolidationFunc } +func (c *consolidatedBlock) Info() block.BlockInformation { + return block.NewBlockInformation(block.BlockConsolidated) +} + func (c *consolidatedBlock) Unconsolidated() (block.UnconsolidatedBlock, error) { return nil, errors.New("unconsolidated blocks are not supported") } diff --git a/src/query/storage/consolidated_test.go b/src/query/storage/consolidated_test.go index c32ba6e0ba..dc8fd2384e 100644 --- a/src/query/storage/consolidated_test.go +++ b/src/query/storage/consolidated_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" @@ -174,10 +176,11 @@ func TestConsolidation(t *testing.T) { unconsolidated, err := NewMultiSeriesBlock(seriesList, fetchQuery, time.Minute) assert.NoError(t, err) - block, err := unconsolidated.Consolidate() + bl, err := unconsolidated.Consolidate() assert.NoError(t, err) - iter, err := block.StepIter() + assert.Equal(t, block.BlockConsolidated, bl.Info().Type()) + iter, err := bl.StepIter() assert.NoError(t, err) i := 0 diff --git a/src/query/test/executor/transform.go b/src/query/test/executor/transform.go index e4a84404e6..eeb266eb72 100644 --- a/src/query/test/executor/transform.go +++ b/src/query/test/executor/transform.go @@ -46,6 +46,7 @@ type SinkNode struct { Values [][]float64 Meta block.Metadata Metas []block.SeriesMeta + Info block.BlockInformation } // Process processes and stores the last block output in the sink node @@ -77,6 +78,7 @@ func (s *SinkNode) Process(_ *models.QueryContext, ID parser.NodeID, block block } s.Meta = iter.Meta() + s.Info = block.Info() return nil } diff --git a/src/query/ts/m3db/encoded_block.go b/src/query/ts/m3db/encoded_block.go index e6b6e29e5c..cf1845696a 100644 --- a/src/query/ts/m3db/encoded_block.go +++ b/src/query/ts/m3db/encoded_block.go @@ -141,6 +141,10 @@ func (b *encodedBlock) generateMetas() error { return nil } +func (b *encodedBlock) Info() block.BlockInformation { + return block.NewBlockInformation(block.BlockM3TSZCompressed) +} + func (b *encodedBlock) WithMetadata( meta block.Metadata, seriesMetas []block.SeriesMeta, diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index 5cfaf10260..10e950063a 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -42,6 +42,7 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/pkg/profile" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -151,9 +152,10 @@ func testConsolidatedStepIteratorMinuteLookback(t *testing.T, withPools bool) { blocks, bounds := generateBlocks(t, tt.stepSize, opts) j := 0 - for i, block := range blocks { - iters, err := block.StepIter() + for i, bl := range blocks { + iters, err := bl.StepIter() require.NoError(t, err) + assert.Equal(t, block.BlockM3TSZCompressed, bl.Info().Type()) require.True(t, bounds.Equals(iters.Meta().Bounds)) verifyMetas(t, i, iters.Meta(), iters.SeriesMeta()) From 3a6c470022aeb13bc4c0a7622492562bb5cb044e Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 22 Aug 2019 18:02:40 -0400 Subject: [PATCH 5/5] PR response --- src/query/block/accounted_test.go | 2 +- src/query/block/block_mock.go | 18 ++++++------ src/query/block/column.go | 16 +++++------ src/query/block/column_test.go | 5 ++-- src/query/block/container.go | 4 +-- src/query/block/empty.go | 4 +++ src/query/block/{information.go => info.go} | 28 +++++++++--------- src/query/block/lazy.go | 4 +-- src/query/block/lazy_test.go | 2 +- src/query/block/scalar.go | 4 +-- src/query/block/types.go | 32 +++++++++++++++++---- src/query/functions/aggregation/absent.go | 6 +--- src/query/functions/binary/and.go | 8 ++++-- src/query/functions/binary/and_test.go | 5 ++-- src/query/functions/binary/binary.go | 10 +++++-- src/query/functions/binary/binary_test.go | 8 +++--- src/query/functions/binary/common.go | 14 +++++---- src/query/functions/binary/logical.go | 2 +- src/query/functions/binary/or.go | 8 ++++-- src/query/functions/binary/or_test.go | 3 +- src/query/functions/binary/types.go | 6 ++-- src/query/functions/binary/unless.go | 8 ++++-- src/query/functions/binary/unless_test.go | 2 +- src/query/functions/scalar/time.go | 3 +- src/query/functions/scalar/time_test.go | 1 - src/query/parser/promql/types.go | 6 ++-- src/query/storage/block.go | 5 ++-- src/query/storage/consolidated.go | 4 +-- src/query/test/executor/transform.go | 2 +- src/query/ts/m3db/encoded_block.go | 4 +-- 30 files changed, 132 insertions(+), 92 deletions(-) rename src/query/block/{information.go => info.go} (80%) diff --git a/src/query/block/accounted_test.go b/src/query/block/accounted_test.go index 88a2518fca..b5858ef210 100644 --- a/src/query/block/accounted_test.go +++ b/src/query/block/accounted_test.go @@ -40,7 +40,7 @@ func TestAccountedBlock_Close(t *testing.T) { block := NewAccountedBlock(wrapped, mockEnforcer) - wrapped.EXPECT().Info().Return(NewBlockInformation(BlockM3TSZCompressed)) + wrapped.EXPECT().Info().Return(NewBlockInfo(BlockM3TSZCompressed)) assert.Equal(t, BlockM3TSZCompressed, block.Info().Type()) assert.NotPanics(t, func() { block.Close() }) } diff --git a/src/query/block/block_mock.go b/src/query/block/block_mock.go index 2b92d17d51..09c80043e6 100644 --- a/src/query/block/block_mock.go +++ b/src/query/block/block_mock.go @@ -71,10 +71,10 @@ func (mr *MockBlockMockRecorder) Close() *gomock.Call { } // Info mocks base method -func (m *MockBlock) Info() BlockInformation { +func (m *MockBlock) Info() BlockInfo { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Info") - ret0, _ := ret[0].(BlockInformation) + ret0, _ := ret[0].(BlockInfo) return ret0 } @@ -461,16 +461,18 @@ func (mr *MockBuilderMockRecorder) Build() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockBuilder)(nil).Build)) } -// SetBlockType mocks base method -func (m *MockBuilder) SetBlockType(arg0 BlockType) { +// BuildAsType mocks base method +func (m *MockBuilder) BuildAsType(arg0 BlockType) Block { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetBlockType", arg0) + ret := m.ctrl.Call(m, "BuildAsType", arg0) + ret0, _ := ret[0].(Block) + return ret0 } -// SetBlockType indicates an expected call of SetBlockType -func (mr *MockBuilderMockRecorder) SetBlockType(arg0 interface{}) *gomock.Call { +// BuildAsType indicates an expected call of BuildAsType +func (mr *MockBuilderMockRecorder) BuildAsType(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetBlockType", reflect.TypeOf((*MockBuilder)(nil).SetBlockType), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildAsType", reflect.TypeOf((*MockBuilder)(nil).BuildAsType), arg0) } // MockStep is a mock of Step interface diff --git a/src/query/block/column.go b/src/query/block/column.go index 230fbc0761..3764a72d4d 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -92,8 +92,8 @@ func (c *columnBlock) StepCount() int { return len(c.columns) } -func (c *columnBlock) Info() BlockInformation { - return NewBlockInformation(c.blockType) +func (c *columnBlock) Info() BlockInfo { + return NewBlockInfo(c.blockType) } // Close frees up any resources @@ -193,10 +193,6 @@ func NewColumnBlockBuilder( } } -func (cb ColumnBlockBuilder) SetBlockType(blockType BlockType) { - cb.block.blockType = blockType -} - // AppendValue adds a value to a column at index func (cb ColumnBlockBuilder) AppendValue(idx int, value float64) error { columns := cb.block.columns @@ -233,19 +229,21 @@ func (cb ColumnBlockBuilder) AppendValues(idx int, values []float64) error { return nil } -// AddCols adds new columns func (cb ColumnBlockBuilder) AddCols(num int) error { newCols := make([]column, num) cb.block.columns = append(cb.block.columns, newCols...) return nil } -// Build extracts the block -// TODO: Return an immutable copy func (cb ColumnBlockBuilder) Build() Block { return NewAccountedBlock(cb.block, cb.enforcer) } +func (cb ColumnBlockBuilder) BuildAsType(blockType BlockType) Block { + cb.block.blockType = blockType + return NewAccountedBlock(cb.block, cb.enforcer) +} + type column struct { Values []float64 } diff --git a/src/query/block/column_test.go b/src/query/block/column_test.go index bff732ef48..d2f86341af 100644 --- a/src/query/block/column_test.go +++ b/src/query/block/column_test.go @@ -26,8 +26,8 @@ import ( "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/assert" "github.com/uber-go/tally" ) @@ -40,7 +40,6 @@ func TestColumnBuilderInfoTypes(t *testing.T) { block := builder.Build() assert.Equal(t, BlockDecompressed, block.Info().blockType) - block = builder.Build() - builder.SetBlockType(BlockScalar) + block = builder.BuildAsType(BlockScalar) assert.Equal(t, BlockScalar, block.Info().blockType) } diff --git a/src/query/block/container.go b/src/query/block/container.go index b8bff6a5ae..3dd37627f2 100644 --- a/src/query/block/container.go +++ b/src/query/block/container.go @@ -58,8 +58,8 @@ func (b *containerBlock) AddBlock(bl Block) error { return nil } -func (c *containerBlock) Info() BlockInformation { - return NewBlockInformation(BlockContainer) +func (c *containerBlock) Info() BlockInfo { + return NewBlockInfo(BlockContainer) } func (b *containerBlock) Close() error { diff --git a/src/query/block/empty.go b/src/query/block/empty.go index 92de064156..e4033d08ae 100644 --- a/src/query/block/empty.go +++ b/src/query/block/empty.go @@ -31,6 +31,10 @@ func NewEmptyBlock(meta Metadata) Block { func (b *emptyBlock) Close() error { return nil } +func (b *emptyBlock) Info() BlockInfo { + return NewBlockInfo(BlockEmpty) +} + func (b *emptyBlock) WithMetadata(meta Metadata, _ []SeriesMeta) (Block, error) { return NewEmptyBlock(meta), nil } diff --git a/src/query/block/information.go b/src/query/block/info.go similarity index 80% rename from src/query/block/information.go rename to src/query/block/info.go index 151bf72417..9aa7c081ba 100644 --- a/src/query/block/information.go +++ b/src/query/block/info.go @@ -23,7 +23,7 @@ package block func (t BlockType) String() string { switch t { case BlockM3TSZCompressed: - return "M3TSZ_compressed" + return "compressed_m3tsz" case BlockDecompressed: return "decompressed" case BlockScalar: @@ -34,8 +34,10 @@ func (t BlockType) String() string { return "time" case BlockContainer: return "container" - case BlockWrapper: - return "wrapper" + case BlockEmpty: + return "empty" + case BlockMultiSeries: + return "multiseries" case BlockConsolidated: return "consolidated" } @@ -43,33 +45,33 @@ func (t BlockType) String() string { return "unknown" } -type BlockInformation struct { +type BlockInfo struct { blockType BlockType inner []BlockType } -func NewBlockInformation(blockType BlockType) BlockInformation { - return BlockInformation{blockType: blockType} +func NewBlockInfo(blockType BlockType) BlockInfo { + return BlockInfo{blockType: blockType} } -func NewWrappedBlockInformation( +func NewWrappedBlockInfo( blockType BlockType, - wrap BlockInformation, -) BlockInformation { + wrap BlockInfo, +) BlockInfo { inner := make([]BlockType, len(wrap.inner)+1) copy(inner[:1], wrap.inner) inner[0] = wrap.blockType - return BlockInformation{ + return BlockInfo{ blockType: blockType, inner: inner, } } -func (b BlockInformation) Type() BlockType { +func (b BlockInfo) Type() BlockType { return b.blockType } -func (b BlockInformation) InnerType() BlockType { +func (b BlockInfo) InnerType() BlockType { if b.inner == nil { return b.Type() } @@ -77,7 +79,7 @@ func (b BlockInformation) InnerType() BlockType { return b.inner[0] } -func (b BlockInformation) BaseType() BlockType { +func (b BlockInfo) BaseType() BlockType { if b.inner == nil { return b.Type() } diff --git a/src/query/block/lazy.go b/src/query/block/lazy.go index e860fe2277..0fff5547f9 100644 --- a/src/query/block/lazy.go +++ b/src/query/block/lazy.go @@ -39,8 +39,8 @@ func NewLazyBlock(block Block, opts LazyOptions) Block { } } -func (c *lazyBlock) Info() BlockInformation { - return NewWrappedBlockInformation(BlockLazy, c.block.Info()) +func (c *lazyBlock) Info() BlockInfo { + return NewWrappedBlockInfo(BlockLazy, c.block.Info()) } func (b *lazyBlock) Close() error { return b.block.Close() } diff --git a/src/query/block/lazy_test.go b/src/query/block/lazy_test.go index 0d0ae23268..cd3c8ccdd4 100644 --- a/src/query/block/lazy_test.go +++ b/src/query/block/lazy_test.go @@ -78,7 +78,7 @@ func TestValidOffset(t *testing.T) { offset := time.Minute off := NewLazyBlock(b, testLazyOpts(offset, 1.0)) - b.EXPECT().Info().Return(NewBlockInformation(BlockM3TSZCompressed)) + b.EXPECT().Info().Return(NewBlockInfo(BlockM3TSZCompressed)) info := off.Info() assert.Equal(t, BlockLazy, info.Type()) assert.Equal(t, BlockM3TSZCompressed, info.BaseType()) diff --git a/src/query/block/scalar.go b/src/query/block/scalar.go index 908fff4917..9be404c232 100644 --- a/src/query/block/scalar.go +++ b/src/query/block/scalar.go @@ -48,8 +48,8 @@ func NewScalar( } } -func (c *Scalar) Info() BlockInformation { - return NewBlockInformation(BlockScalar) +func (c *Scalar) Info() BlockInfo { + return NewBlockInfo(BlockScalar) } // Unconsolidated returns the unconsolidated version for the block. diff --git a/src/query/block/types.go b/src/query/block/types.go index db7750635d..d0f003a2ed 100644 --- a/src/query/block/types.go +++ b/src/query/block/types.go @@ -29,16 +29,33 @@ import ( "github.com/m3db/m3/src/query/ts" ) +// BlockType describes a block type. type BlockType uint8 const ( + // BlockM3TSZCompressed is an M3TSZ compressed block. BlockM3TSZCompressed BlockType = iota + // BlockDecompressed is a decompressed raw data block. BlockDecompressed + // BlockScalar is a scalar block with a single value throughout its range. BlockScalar - BlockLazy + // BlockTime is a block with datapoint values given by a function of their + // timestamps. BlockTime + // BlockLazy is a wrapper for an inner block that lazily applies transforms. + BlockLazy + // BlockContainer is a block that contains multiple inner blocks that share + // common metadata. BlockContainer - BlockWrapper + // BlockEmpty is a block with metadata but no series or values. + BlockEmpty + // + // TODO: (arnikola) do some refactoring to remove the blocks and types below, + // as they can be better handled by the above block types. + // + // BlockMultiSeries is a block containing series with common metadata. + BlockMultiSeries + // BlockConsolidated is a consolidated block. BlockConsolidated ) @@ -56,7 +73,7 @@ type Block interface { // WithMetadata returns a block with updated meta and series metadata. WithMetadata(Metadata, []SeriesMeta) (Block, error) // Info returns information about the block. - Info() BlockInformation + Info() BlockInfo } type AccumulatorBlock interface { @@ -168,11 +185,16 @@ type UnconsolidatedStep interface { // Builder builds a new block. type Builder interface { + // AddCols adds the given number of columns to the block. + AddCols(num int) error + // AppendValue adds a single value to the column at the given index. AppendValue(idx int, value float64) error + // AppendValues adds a slice of values to the column at the given index. AppendValues(idx int, values []float64) error - SetBlockType(blockType BlockType) + // Build builds the block. Build() Block - AddCols(num int) error + // BuildAsType builds the block, forcing it to the given BlockType. + BuildAsType(blockType BlockType) Block } // Result is the result from a block query. diff --git a/src/query/functions/aggregation/absent.go b/src/query/functions/aggregation/absent.go index 85a8fe8d07..c64666df14 100644 --- a/src/query/functions/aggregation/absent.go +++ b/src/query/functions/aggregation/absent.go @@ -23,7 +23,6 @@ package aggregation import ( "fmt" "math" - "time" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" @@ -102,10 +101,7 @@ func (n *absentNode) ProcessBlock(queryCtx *models.QueryContext, // If no series in the input, return a scalar block with value 1. if len(seriesMetas) == 0 { - return block.NewScalar( - func(_ time.Time) float64 { return 1 }, - meta, - ), nil + return block.NewScalar(1, meta), nil } // NB: pull any common tags out into the created series. diff --git a/src/query/functions/binary/and.go b/src/query/functions/binary/and.go index 320b756698..f3fae21de7 100644 --- a/src/query/functions/binary/and.go +++ b/src/query/functions/binary/and.go @@ -37,8 +37,12 @@ func makeAndBlock( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + lMeta, lSeriesMetas := lIter.Meta(), lIter.SeriesMeta() lMeta, lSeriesMetas = removeNameTags(lMeta, lSeriesMetas) @@ -100,7 +104,7 @@ func makeAndBlock( // intersect returns the slice of lhs indices matching rhs indices. func andIntersect( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]indexMatcher, []block.SeriesMeta) { idFunction := hashFunc(matching.On, matching.MatchingLabels...) diff --git a/src/query/functions/binary/and_test.go b/src/query/functions/binary/and_test.go index 6d397b7cb1..cd4b3a87cc 100644 --- a/src/query/functions/binary/and_test.go +++ b/src/query/functions/binary/and_test.go @@ -36,9 +36,8 @@ import ( "github.com/stretchr/testify/require" ) -func nilVectorMatcherBuilder(_, _ block.Block) *VectorMatching { return nil } -func emptyVectorMatcherBuilder(_, _ block.Block) *VectorMatching { - return &VectorMatching{} +func emptyVectorMatcherBuilder(_, _ block.Block) VectorMatching { + return VectorMatching{Set: true} } func TestAndWithExactValues(t *testing.T) { diff --git a/src/query/functions/binary/binary.go b/src/query/functions/binary/binary.go index 79bcda9c74..d9842f1222 100644 --- a/src/query/functions/binary/binary.go +++ b/src/query/functions/binary/binary.go @@ -111,7 +111,7 @@ func processBinary( // NB(arnikola): this is a sanity check, as functions between // two series missing vector matching should have previously // errored out during the parsing step. - if matcher == nil { + if !matcher.Set { return nil, errNoMatching } @@ -161,9 +161,13 @@ func processBothSeries( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, fn binaryFunction, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + if lIter.StepCount() != rIter.StepCount() { return nil, errMismatchedStepCounts } @@ -222,7 +226,7 @@ func processBothSeries( // intersect returns the slice of lhs indices that are shared with rhs, // the indices of the corresponding rhs values, and the metas for taken indices. func intersect( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]int, []int, []block.SeriesMeta) { idFunction := hashFunc(matching.On, matching.MatchingLabels...) diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 2e4de7a020..e470b0bc6b 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -106,7 +106,7 @@ func TestScalars(t *testing.T) { LNode: parser.NodeID(0), RNode: parser.NodeID(1), ReturnBool: true, - VectorMatcherBuilder: nilVectorMatcherBuilder, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -161,7 +161,7 @@ func TestScalarsReturnBoolFalse(t *testing.T) { LNode: parser.NodeID(0), RNode: parser.NodeID(1), ReturnBool: false, - VectorMatcherBuilder: nilVectorMatcherBuilder, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -549,7 +549,7 @@ func TestSingleSeriesReturnBool(t *testing.T) { LNode: parser.NodeID(0), RNode: parser.NodeID(1), ReturnBool: true, - VectorMatcherBuilder: nilVectorMatcherBuilder, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -617,7 +617,7 @@ func TestSingleSeriesReturnValues(t *testing.T) { LNode: parser.NodeID(0), RNode: parser.NodeID(1), ReturnBool: false, - VectorMatcherBuilder: nilVectorMatcherBuilder, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) diff --git a/src/query/functions/binary/common.go b/src/query/functions/binary/common.go index 43aa6bbaaa..46fad91603 100644 --- a/src/query/functions/binary/common.go +++ b/src/query/functions/binary/common.go @@ -78,30 +78,34 @@ func tagMap(t models.Tags) map[string]models.Tag { // Iff one of left or right is a time block, match match one to many // against it, and match everything. -func defaultVectorMatcherBuilder(lhs, rhs block.Block) *VectorMatching { +func defaultVectorMatcherBuilder(lhs, rhs block.Block) VectorMatching { left := lhs.Info().BaseType() == block.BlockTime right := rhs.Info().BaseType() == block.BlockTime + if left { if right { - return &VectorMatching{ + return VectorMatching{ + Set: true, Card: CardOneToOne, } } - return &VectorMatching{ + return VectorMatching{ + Set: true, Card: CardOneToMany, On: true, } } if right { - return &VectorMatching{ + return VectorMatching{ + Set: true, Card: CardManyToOne, On: true, } } - return nil + return VectorMatching{Set: false} } func combineMetaAndSeriesMeta( diff --git a/src/query/functions/binary/logical.go b/src/query/functions/binary/logical.go index 293c791683..ddbf45b1d3 100644 --- a/src/query/functions/binary/logical.go +++ b/src/query/functions/binary/logical.go @@ -30,7 +30,7 @@ type makeBlockFn func( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) // Builds a logical processing function if able. If wrong opType supplied, diff --git a/src/query/functions/binary/or.go b/src/query/functions/binary/or.go index e45abb4069..739467b3a9 100644 --- a/src/query/functions/binary/or.go +++ b/src/query/functions/binary/or.go @@ -36,8 +36,12 @@ func makeOrBlock( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + lMeta, lSeriesMetas := lIter.Meta(), lIter.SeriesMeta() lMeta, lSeriesMetas = removeNameTags(lMeta, lSeriesMetas) @@ -115,7 +119,7 @@ func makeOrBlock( // added after all lhs series have been added. // This function also combines the series metadatas for the entire block. func mergeIndices( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]int, []block.SeriesMeta) { idFunction := hashFunc(matching.On, matching.MatchingLabels...) diff --git a/src/query/functions/binary/or_test.go b/src/query/functions/binary/or_test.go index f08f3930af..2dd9b2113b 100644 --- a/src/query/functions/binary/or_test.go +++ b/src/query/functions/binary/or_test.go @@ -152,8 +152,7 @@ var indexMatchingTests = []struct { } func TestIndexMerging(t *testing.T) { - matching := &VectorMatching{} - + matching := VectorMatching{} for _, tt := range indexMatchingTests { t.Run(tt.name, func(t *testing.T) { matching, _ := mergeIndices(matching, tt.lhs, tt.rhs) diff --git a/src/query/functions/binary/types.go b/src/query/functions/binary/types.go index 7847bb6191..8f91102eaa 100644 --- a/src/query/functions/binary/types.go +++ b/src/query/functions/binary/types.go @@ -43,7 +43,9 @@ const ( // VectorMatching describes how elements from two Vectors in a binary // operation are supposed to be matched. type VectorMatching struct { - // The cardinality of the two Vectors. + // Set determines if this has been set or is using default values. + Set bool + // Card is the cardinality of the two Vectors. Card VectorMatchCardinality // MatchingLabels contains the labels which define equality of a pair of // elements from the Vectors. @@ -57,7 +59,7 @@ type VectorMatching struct { } // VectorMatcherBuilder creates a vector matcher based on incoming block types. -type VectorMatcherBuilder func(lhs, rhs block.Block) *VectorMatching +type VectorMatcherBuilder func(lhs, rhs block.Block) VectorMatching // NodeParams describes the types of nodes used for binary operations. type NodeParams struct { diff --git a/src/query/functions/binary/unless.go b/src/query/functions/binary/unless.go index 99cecf427b..62e3d80c3d 100644 --- a/src/query/functions/binary/unless.go +++ b/src/query/functions/binary/unless.go @@ -36,8 +36,12 @@ func makeUnlessBlock( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + lMeta, lSeriesMetas := lIter.Meta(), lIter.SeriesMeta() lMeta, lSeriesMetas = removeNameTags(lMeta, lSeriesMetas) @@ -100,7 +104,7 @@ func makeUnlessBlock( // matchingIndices returns a slice representing which index in the lhs the rhs // series maps to. If it does not map to an existing index, this is set to -1. func matchingIndices( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) []indexMatcher { idFunction := hashFunc(matching.On, matching.MatchingLabels...) diff --git a/src/query/functions/binary/unless_test.go b/src/query/functions/binary/unless_test.go index f2256316fe..4dbfd3ce74 100644 --- a/src/query/functions/binary/unless_test.go +++ b/src/query/functions/binary/unless_test.go @@ -105,7 +105,7 @@ var distinctLeftTests = []struct { } func TestMatchingIndices(t *testing.T) { - matching := &VectorMatching{} + matching := VectorMatching{} for _, tt := range distinctLeftTests { t.Run(tt.name, func(t *testing.T) { excluded := matchingIndices(matching, tt.lhs, tt.rhs) diff --git a/src/query/functions/scalar/time.go b/src/query/functions/scalar/time.go index 6d57eff1e7..ab16ded7e6 100644 --- a/src/query/functions/scalar/time.go +++ b/src/query/functions/scalar/time.go @@ -101,8 +101,7 @@ func (n *timeNode) Execute(queryCtx *models.QueryContext) error { } } - builder.SetBlockType(block.BlockTime) - block := builder.Build() + block := builder.BuildAsType(block.BlockTime) if n.opts.Debug() { // Ignore any errors iter, _ := block.StepIter() diff --git a/src/query/functions/scalar/time_test.go b/src/query/functions/scalar/time_test.go index 930db7909f..7cfc3bc640 100644 --- a/src/query/functions/scalar/time_test.go +++ b/src/query/functions/scalar/time_test.go @@ -25,7 +25,6 @@ import ( "time" "github.com/m3db/m3/src/query/block" - "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index f5902c8c69..012b6eb8c7 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -24,7 +24,6 @@ import ( "fmt" "github.com/m3db/m3/src/query/block" - "github.com/m3db/m3/src/query/functions" "github.com/m3db/m3/src/query/functions/aggregation" "github.com/m3db/m3/src/query/functions/binary" @@ -406,8 +405,9 @@ func promMatchingToM3( byteMatchers[i] = []byte(label) } - return func(_, _ block.Block) *binary.VectorMatching { - return &binary.VectorMatching{ + return func(_, _ block.Block) binary.VectorMatching { + return binary.VectorMatching{ + Set: true, Card: promVectorCardinalityToM3(vectorMatching.Card), MatchingLabels: byteMatchers, On: vectorMatching.On, diff --git a/src/query/storage/block.go b/src/query/storage/block.go index 213fa48655..02bd24a5b6 100644 --- a/src/query/storage/block.go +++ b/src/query/storage/block.go @@ -64,9 +64,8 @@ type multiBlockWrapper struct { unconsolidated block.UnconsolidatedBlock } -func (m *multiBlockWrapper) Info() block.BlockInformation { - return block.NewWrappedBlockInformation(block.BlockWrapper, - m.consolidated.Info()) +func (m *multiBlockWrapper) Info() block.BlockInfo { + return block.NewBlockInfo(block.BlockMultiSeries) } func (m *multiBlockWrapper) Unconsolidated() (block.UnconsolidatedBlock, error) { diff --git a/src/query/storage/consolidated.go b/src/query/storage/consolidated.go index c8969ff272..e46449ec3a 100644 --- a/src/query/storage/consolidated.go +++ b/src/query/storage/consolidated.go @@ -31,8 +31,8 @@ type consolidatedBlock struct { consolidationFunc block.ConsolidationFunc } -func (c *consolidatedBlock) Info() block.BlockInformation { - return block.NewBlockInformation(block.BlockConsolidated) +func (c *consolidatedBlock) Info() block.BlockInfo { + return block.NewBlockInfo(block.BlockConsolidated) } func (c *consolidatedBlock) Unconsolidated() (block.UnconsolidatedBlock, error) { diff --git a/src/query/test/executor/transform.go b/src/query/test/executor/transform.go index eeb266eb72..2d2a66297b 100644 --- a/src/query/test/executor/transform.go +++ b/src/query/test/executor/transform.go @@ -46,7 +46,7 @@ type SinkNode struct { Values [][]float64 Meta block.Metadata Metas []block.SeriesMeta - Info block.BlockInformation + Info block.BlockInfo } // Process processes and stores the last block output in the sink node diff --git a/src/query/ts/m3db/encoded_block.go b/src/query/ts/m3db/encoded_block.go index cf1845696a..20bbeb36c7 100644 --- a/src/query/ts/m3db/encoded_block.go +++ b/src/query/ts/m3db/encoded_block.go @@ -141,8 +141,8 @@ func (b *encodedBlock) generateMetas() error { return nil } -func (b *encodedBlock) Info() block.BlockInformation { - return block.NewBlockInformation(block.BlockM3TSZCompressed) +func (b *encodedBlock) Info() block.BlockInfo { + return block.NewBlockInfo(block.BlockM3TSZCompressed) } func (b *encodedBlock) WithMetadata(