From f04a2e16c3e513c1aef02bee21e9609e0bc1b715 Mon Sep 17 00:00:00 2001 From: arnikola Date: Thu, 22 Aug 2019 18:19:28 -0400 Subject: [PATCH] [query] Fix time() function in binary comparisons (#1888) --- src/query/block/accounted.go | 11 +- src/query/block/accounted_test.go | 7 +- src/query/block/block_mock.go | 28 ++++ src/query/block/column.go | 15 +- src/query/block/column_test.go | 45 ++++++ src/query/block/container.go | 4 + src/query/block/empty.go | 4 + src/query/block/info.go | 88 +++++++++++ src/query/block/lazy.go | 4 + src/query/block/lazy_test.go | 5 + src/query/block/scalar.go | 38 +++-- src/query/block/scalar_test.go | 3 +- src/query/block/types.go | 40 ++++- src/query/functions/aggregation/absent.go | 6 +- src/query/functions/binary/and.go | 10 +- src/query/functions/binary/and_test.go | 22 +-- src/query/functions/binary/arithmetic.go | 2 +- src/query/functions/binary/base.go | 16 +- src/query/functions/binary/binary.go | 39 +++-- src/query/functions/binary/binary_test.go | 146 +++++++----------- src/query/functions/binary/common.go | 67 ++++---- src/query/functions/binary/comparison.go | 2 +- src/query/functions/binary/logical.go | 7 +- src/query/functions/binary/or.go | 10 +- src/query/functions/binary/or_test.go | 33 ++-- src/query/functions/binary/types.go | 69 +++++++++ src/query/functions/binary/unless.go | 10 +- src/query/functions/binary/unless_test.go | 8 +- .../functions/scalar/{base.go => scalar.go} | 49 +++--- .../scalar/{base_test.go => scalar_test.go} | 25 +-- src/query/functions/scalar/time.go | 122 +++++++++++++++ src/query/functions/scalar/time_test.go | 65 ++++++++ src/query/parser/promql/parse.go | 7 +- src/query/parser/promql/types.go | 43 ++---- src/query/storage/block.go | 4 + src/query/storage/consolidated.go | 4 + src/query/storage/consolidated_test.go | 7 +- src/query/test/executor/transform.go | 2 + src/query/ts/m3db/encoded_block.go | 4 + .../ts/m3db/encoded_step_iterator_test.go | 6 +- 40 files changed, 768 insertions(+), 309 deletions(-) create mode 100644 src/query/block/column_test.go create mode 100644 src/query/block/info.go create mode 100644 src/query/functions/binary/types.go rename src/query/functions/scalar/{base.go => scalar.go} (75%) rename src/query/functions/scalar/{base_test.go => scalar_test.go} (82%) create mode 100644 src/query/functions/scalar/time.go create mode 100644 src/query/functions/scalar/time_test.go diff --git a/src/query/block/accounted.go b/src/query/block/accounted.go index 3538b7812c..3dc2b45785 100644 --- a/src/query/block/accounted.go +++ b/src/query/block/accounted.go @@ -22,7 +22,8 @@ package block import "github.com/m3db/m3/src/query/cost" -// AccountedBlock is a wrapper for a block which enforces limits on the number of datapoints used by the block. +// AccountedBlock is a wrapper for a block which enforces limits on the number +// of datapoints used by the block. type AccountedBlock struct { Block @@ -30,14 +31,18 @@ type AccountedBlock struct { } // NewAccountedBlock wraps the given block and enforces datapoint limits. -func NewAccountedBlock(wrapped Block, enforcer cost.ChainedEnforcer) *AccountedBlock { +func NewAccountedBlock( + wrapped Block, + enforcer cost.ChainedEnforcer, +) *AccountedBlock { return &AccountedBlock{ Block: wrapped, enforcer: enforcer, } } -// Close closes the block, and marks the number of datapoints used by this block as finished. +// Close closes the block, and marks the number of datapoints used +// by this block as finished. func (ab *AccountedBlock) Close() error { ab.enforcer.Close() return ab.Block.Close() diff --git a/src/query/block/accounted_test.go b/src/query/block/accounted_test.go index 8172e341a9..b5858ef210 100644 --- a/src/query/block/accounted_test.go +++ b/src/query/block/accounted_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/cost" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" ) func TestAccountedBlock_Close(t *testing.T) { @@ -37,5 +38,9 @@ func TestAccountedBlock_Close(t *testing.T) { mockEnforcer := cost.NewMockChainedEnforcer(ctrl) mockEnforcer.EXPECT().Close() - NewAccountedBlock(wrapped, mockEnforcer).Close() + block := NewAccountedBlock(wrapped, mockEnforcer) + + wrapped.EXPECT().Info().Return(NewBlockInfo(BlockM3TSZCompressed)) + assert.Equal(t, BlockM3TSZCompressed, block.Info().Type()) + assert.NotPanics(t, func() { block.Close() }) } diff --git a/src/query/block/block_mock.go b/src/query/block/block_mock.go index 73831d5296..09c80043e6 100644 --- a/src/query/block/block_mock.go +++ b/src/query/block/block_mock.go @@ -70,6 +70,20 @@ func (mr *MockBlockMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockBlock)(nil).Close)) } +// Info mocks base method +func (m *MockBlock) Info() BlockInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Info") + ret0, _ := ret[0].(BlockInfo) + return ret0 +} + +// Info indicates an expected call of Info +func (mr *MockBlockMockRecorder) Info() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockBlock)(nil).Info)) +} + // SeriesIter mocks base method func (m *MockBlock) SeriesIter() (SeriesIter, error) { m.ctrl.T.Helper() @@ -447,6 +461,20 @@ func (mr *MockBuilderMockRecorder) Build() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockBuilder)(nil).Build)) } +// BuildAsType mocks base method +func (m *MockBuilder) BuildAsType(arg0 BlockType) Block { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BuildAsType", arg0) + ret0, _ := ret[0].(Block) + return ret0 +} + +// BuildAsType indicates an expected call of BuildAsType +func (mr *MockBuilderMockRecorder) BuildAsType(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildAsType", reflect.TypeOf((*MockBuilder)(nil).BuildAsType), arg0) +} + // MockStep is a mock of Step interface type MockStep struct { ctrl *gomock.Controller diff --git a/src/query/block/column.go b/src/query/block/column.go index 9b38da5c5d..3764a72d4d 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -39,6 +39,7 @@ type ColumnBlockBuilder struct { } type columnBlock struct { + blockType BlockType columns []column meta Metadata seriesMeta []SeriesMeta @@ -78,6 +79,7 @@ func (c *columnBlock) WithMetadata( columns: c.columns, meta: meta, seriesMeta: seriesMetas, + blockType: BlockDecompressed, }, nil } @@ -90,6 +92,10 @@ func (c *columnBlock) StepCount() int { return len(c.columns) } +func (c *columnBlock) Info() BlockInfo { + return NewBlockInfo(c.blockType) +} + // Close frees up any resources // TODO: actually free up the resources func (c *columnBlock) Close() error { @@ -182,6 +188,7 @@ func NewColumnBlockBuilder( block: &columnBlock{ meta: meta, seriesMeta: seriesMeta, + blockType: BlockDecompressed, }, } } @@ -222,19 +229,21 @@ func (cb ColumnBlockBuilder) AppendValues(idx int, values []float64) error { return nil } -// AddCols adds new columns func (cb ColumnBlockBuilder) AddCols(num int) error { newCols := make([]column, num) cb.block.columns = append(cb.block.columns, newCols...) return nil } -// Build extracts the block -// TODO: Return an immutable copy func (cb ColumnBlockBuilder) Build() Block { return NewAccountedBlock(cb.block, cb.enforcer) } +func (cb ColumnBlockBuilder) BuildAsType(blockType BlockType) Block { + cb.block.blockType = blockType + return NewAccountedBlock(cb.block, cb.enforcer) +} + type column struct { Values []float64 } diff --git a/src/query/block/column_test.go b/src/query/block/column_test.go new file mode 100644 index 0000000000..d2f86341af --- /dev/null +++ b/src/query/block/column_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +import ( + "context" + "testing" + + "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/query/models" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" +) + +func TestColumnBuilderInfoTypes(t *testing.T) { + ctx := models.NewQueryContext(context.Background(), + tally.NoopScope, cost.NoopChainedEnforcer(), + models.QueryContextOptions{}) + + builder := NewColumnBlockBuilder(ctx, Metadata{}, []SeriesMeta{}) + block := builder.Build() + assert.Equal(t, BlockDecompressed, block.Info().blockType) + + block = builder.BuildAsType(BlockScalar) + assert.Equal(t, BlockScalar, block.Info().blockType) +} diff --git a/src/query/block/container.go b/src/query/block/container.go index 290b6854ba..3dd37627f2 100644 --- a/src/query/block/container.go +++ b/src/query/block/container.go @@ -58,6 +58,10 @@ func (b *containerBlock) AddBlock(bl Block) error { return nil } +func (c *containerBlock) Info() BlockInfo { + return NewBlockInfo(BlockContainer) +} + func (b *containerBlock) Close() error { multiErr := xerrors.NewMultiError() multiErr = multiErr.Add(b.err) diff --git a/src/query/block/empty.go b/src/query/block/empty.go index 92de064156..e4033d08ae 100644 --- a/src/query/block/empty.go +++ b/src/query/block/empty.go @@ -31,6 +31,10 @@ func NewEmptyBlock(meta Metadata) Block { func (b *emptyBlock) Close() error { return nil } +func (b *emptyBlock) Info() BlockInfo { + return NewBlockInfo(BlockEmpty) +} + func (b *emptyBlock) WithMetadata(meta Metadata, _ []SeriesMeta) (Block, error) { return NewEmptyBlock(meta), nil } diff --git a/src/query/block/info.go b/src/query/block/info.go new file mode 100644 index 0000000000..9aa7c081ba --- /dev/null +++ b/src/query/block/info.go @@ -0,0 +1,88 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +func (t BlockType) String() string { + switch t { + case BlockM3TSZCompressed: + return "compressed_m3tsz" + case BlockDecompressed: + return "decompressed" + case BlockScalar: + return "scalar" + case BlockLazy: + return "lazy" + case BlockTime: + return "time" + case BlockContainer: + return "container" + case BlockEmpty: + return "empty" + case BlockMultiSeries: + return "multiseries" + case BlockConsolidated: + return "consolidated" + } + + return "unknown" +} + +type BlockInfo struct { + blockType BlockType + inner []BlockType +} + +func NewBlockInfo(blockType BlockType) BlockInfo { + return BlockInfo{blockType: blockType} +} + +func NewWrappedBlockInfo( + blockType BlockType, + wrap BlockInfo, +) BlockInfo { + inner := make([]BlockType, len(wrap.inner)+1) + copy(inner[:1], wrap.inner) + inner[0] = wrap.blockType + return BlockInfo{ + blockType: blockType, + inner: inner, + } +} + +func (b BlockInfo) Type() BlockType { + return b.blockType +} + +func (b BlockInfo) InnerType() BlockType { + if b.inner == nil { + return b.Type() + } + + return b.inner[0] +} + +func (b BlockInfo) BaseType() BlockType { + if b.inner == nil { + return b.Type() + } + + return b.inner[len(b.inner)-1] +} diff --git a/src/query/block/lazy.go b/src/query/block/lazy.go index 0797cd7be4..0fff5547f9 100644 --- a/src/query/block/lazy.go +++ b/src/query/block/lazy.go @@ -39,6 +39,10 @@ func NewLazyBlock(block Block, opts LazyOptions) Block { } } +func (c *lazyBlock) Info() BlockInfo { + return NewWrappedBlockInfo(BlockLazy, c.block.Info()) +} + func (b *lazyBlock) Close() error { return b.block.Close() } func (b *lazyBlock) WithMetadata( diff --git a/src/query/block/lazy_test.go b/src/query/block/lazy_test.go index 0701c23c7b..cd3c8ccdd4 100644 --- a/src/query/block/lazy_test.go +++ b/src/query/block/lazy_test.go @@ -78,6 +78,11 @@ func TestValidOffset(t *testing.T) { offset := time.Minute off := NewLazyBlock(b, testLazyOpts(offset, 1.0)) + b.EXPECT().Info().Return(NewBlockInfo(BlockM3TSZCompressed)) + info := off.Info() + assert.Equal(t, BlockLazy, info.Type()) + assert.Equal(t, BlockM3TSZCompressed, info.BaseType()) + // ensure functions are marshalled to the underlying block. b.EXPECT().Close().Return(nil) err := off.Close() diff --git a/src/query/block/scalar.go b/src/query/block/scalar.go index d2c8ee1678..9be404c232 100644 --- a/src/query/block/scalar.go +++ b/src/query/block/scalar.go @@ -32,61 +32,61 @@ import ( // by allowing them to treat this as a regular block, while at the same time // having an option to optimize by accessing the scalar value directly instead. type Scalar struct { - s ScalarFunc + val float64 meta Metadata } // NewScalar creates a scalar block whose value is given by the function over // the metadata bounds. func NewScalar( - s ScalarFunc, + val float64, meta Metadata, ) Block { return &Scalar{ - s: s, + val: val, meta: meta, } } +func (c *Scalar) Info() BlockInfo { + return NewBlockInfo(BlockScalar) +} + // Unconsolidated returns the unconsolidated version for the block. func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) { - return nil, fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", b.meta) + return nil, + fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", + b.meta) } -// WithMetadata updates this blocks metadata, and the metadatas for each series. func (b *Scalar) WithMetadata( meta Metadata, _ []SeriesMeta, ) (Block, error) { return &Scalar{ meta: meta, - s: b.s, + val: b.val, }, nil } -// StepIter returns a StepIterator func (b *Scalar) StepIter() (StepIter, error) { bounds := b.meta.Bounds steps := bounds.Steps() return &scalarStepIter{ meta: b.meta, - s: b.s, + vals: []float64{b.val}, numVals: steps, idx: -1, }, nil } -// ScalarFunc determines the function to apply to generate the value at each step -type ScalarFunc func(t time.Time) float64 - -// SeriesIter returns a SeriesIterator func (b *Scalar) SeriesIter() (SeriesIter, error) { bounds := b.meta.Bounds steps := bounds.Steps() vals := make([]float64, steps) t := bounds.Start for i := range vals { - vals[i] = b.s(t) + vals[i] = b.val t = t.Add(bounds.StepSize) } @@ -97,12 +97,10 @@ func (b *Scalar) SeriesIter() (SeriesIter, error) { }, nil } -// Close closes the scalar block func (b *Scalar) Close() error { return nil } -// Value returns the value for the scalar block -func (b *Scalar) Value(t time.Time) float64 { - return b.s(t) +func (b *Scalar) Value() float64 { + return b.val } type scalarStepIter struct { @@ -110,10 +108,10 @@ type scalarStepIter struct { stepTime time.Time err error meta Metadata - s ScalarFunc + vals []float64 } -// build an empty SeriesMeta +// build an empty SeriesMetadata. func buildSeriesMeta(meta Metadata) SeriesMeta { return SeriesMeta{ Tags: models.NewTags(0, meta.Tags.Opts), @@ -150,7 +148,7 @@ func (it *scalarStepIter) Next() bool { func (it *scalarStepIter) Current() Step { t := it.stepTime return &scalarStep{ - vals: []float64{it.s(t)}, + vals: it.vals, time: t, } } diff --git a/src/query/block/scalar_test.go b/src/query/block/scalar_test.go index c6ddb4a745..73c55eabbd 100644 --- a/src/query/block/scalar_test.go +++ b/src/query/block/scalar_test.go @@ -47,13 +47,14 @@ func TestScalarBlock(t *testing.T) { }) block := NewScalar( - func(_ time.Time) float64 { return val }, + val, Metadata{ Bounds: bounds, Tags: tags, }, ) + assert.Equal(t, BlockScalar, block.Info().Type()) require.IsType(t, block, &Scalar{}) stepIter, err := block.StepIter() require.NoError(t, err) diff --git a/src/query/block/types.go b/src/query/block/types.go index 2fea02b39a..d0f003a2ed 100644 --- a/src/query/block/types.go +++ b/src/query/block/types.go @@ -29,6 +29,36 @@ import ( "github.com/m3db/m3/src/query/ts" ) +// BlockType describes a block type. +type BlockType uint8 + +const ( + // BlockM3TSZCompressed is an M3TSZ compressed block. + BlockM3TSZCompressed BlockType = iota + // BlockDecompressed is a decompressed raw data block. + BlockDecompressed + // BlockScalar is a scalar block with a single value throughout its range. + BlockScalar + // BlockTime is a block with datapoint values given by a function of their + // timestamps. + BlockTime + // BlockLazy is a wrapper for an inner block that lazily applies transforms. + BlockLazy + // BlockContainer is a block that contains multiple inner blocks that share + // common metadata. + BlockContainer + // BlockEmpty is a block with metadata but no series or values. + BlockEmpty + // + // TODO: (arnikola) do some refactoring to remove the blocks and types below, + // as they can be better handled by the above block types. + // + // BlockMultiSeries is a block containing series with common metadata. + BlockMultiSeries + // BlockConsolidated is a consolidated block. + BlockConsolidated +) + // Block represents a group of series across a time bound. type Block interface { io.Closer @@ -42,6 +72,8 @@ type Block interface { SeriesIter() (SeriesIter, error) // WithMetadata returns a block with updated meta and series metadata. WithMetadata(Metadata, []SeriesMeta) (Block, error) + // Info returns information about the block. + Info() BlockInfo } type AccumulatorBlock interface { @@ -153,10 +185,16 @@ type UnconsolidatedStep interface { // Builder builds a new block. type Builder interface { + // AddCols adds the given number of columns to the block. + AddCols(num int) error + // AppendValue adds a single value to the column at the given index. AppendValue(idx int, value float64) error + // AppendValues adds a slice of values to the column at the given index. AppendValues(idx int, values []float64) error + // Build builds the block. Build() Block - AddCols(num int) error + // BuildAsType builds the block, forcing it to the given BlockType. + BuildAsType(blockType BlockType) Block } // Result is the result from a block query. diff --git a/src/query/functions/aggregation/absent.go b/src/query/functions/aggregation/absent.go index 85a8fe8d07..c64666df14 100644 --- a/src/query/functions/aggregation/absent.go +++ b/src/query/functions/aggregation/absent.go @@ -23,7 +23,6 @@ package aggregation import ( "fmt" "math" - "time" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" @@ -102,10 +101,7 @@ func (n *absentNode) ProcessBlock(queryCtx *models.QueryContext, // If no series in the input, return a scalar block with value 1. if len(seriesMetas) == 0 { - return block.NewScalar( - func(_ time.Time) float64 { return 1 }, - meta, - ), nil + return block.NewScalar(1, meta), nil } // NB: pull any common tags out into the created series. diff --git a/src/query/functions/binary/and.go b/src/query/functions/binary/and.go index c2b77c2eef..f3fae21de7 100644 --- a/src/query/functions/binary/and.go +++ b/src/query/functions/binary/and.go @@ -37,8 +37,12 @@ func makeAndBlock( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + lMeta, lSeriesMetas := lIter.Meta(), lIter.SeriesMeta() lMeta, lSeriesMetas = removeNameTags(lMeta, lSeriesMetas) @@ -100,10 +104,10 @@ func makeAndBlock( // intersect returns the slice of lhs indices matching rhs indices. func andIntersect( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]indexMatcher, []block.SeriesMeta) { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the right-hand side. rightSigs := make(map[uint64]int, len(rhs)) for idx, meta := range rhs { diff --git a/src/query/functions/binary/and_test.go b/src/query/functions/binary/and_test.go index 702d78d8f6..cd4b3a87cc 100644 --- a/src/query/functions/binary/and_test.go +++ b/src/query/functions/binary/and_test.go @@ -36,6 +36,10 @@ import ( "github.com/stretchr/testify/require" ) +func emptyVectorMatcherBuilder(_, _ block.Block) VectorMatching { + return VectorMatching{Set: true} +} + func TestAndWithExactValues(t *testing.T) { values, bounds := test.GenerateValuesAndBounds(nil, nil) block1 := test.NewBlockFromValues(bounds, values) @@ -44,9 +48,9 @@ func TestAndWithExactValues(t *testing.T) { op, err := NewOp( AndType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -76,9 +80,9 @@ func TestAndWithSomeValues(t *testing.T) { op, err := NewOp( AndType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -196,9 +200,9 @@ func TestAnd(t *testing.T) { op, err := NewOp( AndType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/binary/arithmetic.go b/src/query/functions/binary/arithmetic.go index cfd4cb7f0d..53ece86588 100644 --- a/src/query/functions/binary/arithmetic.go +++ b/src/query/functions/binary/arithmetic.go @@ -60,7 +60,7 @@ const ( ) var ( - arithmeticFuncs = map[string]Function{ + arithmeticFuncs = map[string]binaryFunction{ PlusType: func(x, y float64) float64 { return x + y }, MinusType: func(x, y float64) float64 { return x - y }, MultiplyType: func(x, y float64) float64 { return x * y }, diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index 9d600d9920..eb79a24fed 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -37,14 +37,6 @@ type baseOp struct { params NodeParams } -// NodeParams describes the types of nodes used for binary operations. -type NodeParams struct { - LNode, RNode parser.NodeID - LIsScalar, RIsScalar bool - ReturnBool bool - VectorMatching *VectorMatching -} - // OpType for the operator. func (o baseOp) OpType() string { return o.OperatorType @@ -69,7 +61,7 @@ func (o baseOp) Node( } // ArithmeticFunction returns the arithmetic function for this operation type. -func ArithmeticFunction(opType string, returnBool bool) (Function, error) { +func ArithmeticFunction(opType string, returnBool bool) (binaryFunction, error) { if fn, ok := arithmeticFuncs[opType]; ok { return fn, nil } @@ -84,7 +76,7 @@ func ArithmeticFunction(opType string, returnBool bool) (Function, error) { return fn, nil } - return nil, errNoMatching + return nil, fmt.Errorf("no arithmetic function found for type: %s", opType) } // NewOp creates a new binary operation. @@ -92,6 +84,10 @@ func NewOp( opType string, params NodeParams, ) (parser.Params, error) { + if params.VectorMatcherBuilder == nil { + params.VectorMatcherBuilder = defaultVectorMatcherBuilder + } + fn, ok := buildLogicalFunction(opType, params) if !ok { fn, ok = buildArithmeticFunction(opType, params) diff --git a/src/query/functions/binary/binary.go b/src/query/functions/binary/binary.go index d13d584bfc..d9842f1222 100644 --- a/src/query/functions/binary/binary.go +++ b/src/query/functions/binary/binary.go @@ -21,16 +21,13 @@ package binary import ( - "time" - "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" "github.com/m3db/m3/src/query/models" ) -// Function is a function that applies on two floats. -type Function func(x, y float64) float64 +type binaryFunction func(x, y float64) float64 type singleScalarFunc func(x float64) float64 // processes two logical blocks, performing a logical operation on them. @@ -40,23 +37,22 @@ func processBinary( params NodeParams, controller *transform.Controller, isComparison bool, - fn Function, + fn binaryFunction, ) (block.Block, error) { lIter, err := lhs.StepIter() if err != nil { return nil, err } - if params.LIsScalar { + if lhs.Info().Type() == block.BlockScalar { scalarL, ok := lhs.(*block.Scalar) if !ok { return nil, errLeftScalar } - lVal := scalarL.Value(time.Time{}) - + lVal := scalarL.Value() // rhs is a series; use rhs metadata and series meta - if !params.RIsScalar { + if rhs.Info().Type() != block.BlockScalar { return processSingleBlock( queryCtx, rhs, @@ -82,20 +78,18 @@ func processBinary( } return block.NewScalar( - func(t time.Time) float64 { - return fn(lVal, scalarR.Value(t)) - }, + fn(lVal, scalarR.Value()), lIter.Meta(), ), nil } - if params.RIsScalar { + if rhs.Info().Type() == block.BlockScalar { scalarR, ok := rhs.(*block.Scalar) if !ok { return nil, errRightScalar } - rVal := scalarR.Value(time.Time{}) + rVal := scalarR.Value() // lhs is a series; use lhs metadata and series meta. return processSingleBlock( queryCtx, @@ -113,14 +107,15 @@ func processBinary( return nil, err } + matcher := params.VectorMatcherBuilder(lhs, rhs) // NB(arnikola): this is a sanity check, as functions between // two series missing vector matching should have previously // errored out during the parsing step. - if params.VectorMatching == nil { + if !matcher.Set { return nil, errNoMatching } - return processBothSeries(queryCtx, lIter, rIter, controller, params.VectorMatching, fn) + return processBothSeries(queryCtx, lIter, rIter, controller, matcher, fn) } func processSingleBlock( @@ -166,9 +161,13 @@ func processBothSeries( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, - fn Function, + matching VectorMatching, + fn binaryFunction, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + if lIter.StepCount() != rIter.StepCount() { return nil, errMismatchedStepCounts } @@ -227,10 +226,10 @@ func processBothSeries( // intersect returns the slice of lhs indices that are shared with rhs, // the indices of the corresponding rhs values, and the metas for taken indices. func intersect( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]int, []int, []block.SeriesMeta) { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the right-hand side. rightSigs := make(map[uint64]int, len(rhs)) for idx, meta := range rhs { diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 609942a5ef..e470b0bc6b 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -103,12 +103,10 @@ func TestScalars(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: true, - RIsScalar: true, - ReturnBool: true, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: true, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -119,26 +117,20 @@ func TestScalars(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.lVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.lVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) require.NoError(t, err) err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.rVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.rVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) expected := [][]float64{{ @@ -166,12 +158,10 @@ func TestScalarsReturnBoolFalse(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: true, - RIsScalar: true, - ReturnBool: false, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: false, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -182,26 +172,20 @@ func TestScalarsReturnBoolFalse(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.lVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.lVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) require.NoError(t, err) err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.rVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.rVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) if tt.opType == EqType || tt.opType == NotEqType || @@ -562,12 +546,10 @@ func TestSingleSeriesReturnBool(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: !tt.seriesLeft, - RIsScalar: tt.seriesLeft, - ReturnBool: true, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: true, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -592,13 +574,10 @@ func TestSingleSeriesReturnBool(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.scalarVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) require.NoError(t, err) @@ -606,13 +585,10 @@ func TestSingleSeriesReturnBool(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.scalarVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) require.NoError(t, err) @@ -638,12 +614,10 @@ func TestSingleSeriesReturnValues(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: !tt.seriesLeft, - RIsScalar: tt.seriesLeft, - ReturnBool: false, - VectorMatching: nil, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: false, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) @@ -668,13 +642,10 @@ func TestSingleSeriesReturnValues(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(1), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.scalarVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) require.NoError(t, err) @@ -682,13 +653,10 @@ func TestSingleSeriesReturnValues(t *testing.T) { err = node.Process( models.NoopQueryContext(), parser.NodeID(0), - block.NewScalar( - func(_ time.Time) float64 { return tt.scalarVal }, - block.Metadata{ - Bounds: bounds, - Tags: models.EmptyTags(), - }, - ), + block.NewScalar(tt.scalarVal, block.Metadata{ + Bounds: bounds, + Tags: models.EmptyTags(), + }), ) require.NoError(t, err) @@ -927,12 +895,10 @@ func TestBothSeries(t *testing.T) { op, err := NewOp( tt.opType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: false, - RIsScalar: false, - ReturnBool: tt.returnBool, - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + ReturnBool: tt.returnBool, + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -1004,11 +970,9 @@ func TestBinaryFunctionWithDifferentNames(t *testing.T) { op, err := NewOp( PlusType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - LIsScalar: false, - RIsScalar: false, - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/binary/common.go b/src/query/functions/binary/common.go index 7794539cea..46fad91603 100644 --- a/src/query/functions/binary/common.go +++ b/src/query/functions/binary/common.go @@ -38,40 +38,9 @@ type indexMatcher struct { rhsIndex int } -// VectorMatchCardinality describes the cardinality relationship -// of two Vectors in a binary operation. -type VectorMatchCardinality int - -const ( - // CardOneToOne is used for one-one relationship - CardOneToOne VectorMatchCardinality = iota - // CardManyToOne is used for many-one relationship - CardManyToOne - // CardOneToMany is used for one-many relationship - CardOneToMany - // CardManyToMany is used for many-many relationship - CardManyToMany -) - -// VectorMatching describes how elements from two Vectors in a binary -// operation are supposed to be matched. -type VectorMatching struct { - // The cardinality of the two Vectors. - Card VectorMatchCardinality - // MatchingLabels contains the labels which define equality of a pair of - // elements from the Vectors. - MatchingLabels [][]byte - // On includes the given label names from matching, - // rather than excluding them. - On bool - // Include contains additional labels that should be included in - // the result from the side with the lower cardinality. - Include []string -} - -// HashFunc returns a function that calculates the signature for a metric +// hashFunc returns a function that calculates the signature for a metric // ignoring the provided labels. If on, then only the given labels are used. -func HashFunc(on bool, names ...[]byte) func(models.Tags) uint64 { +func hashFunc(on bool, names ...[]byte) func(models.Tags) uint64 { if on { return func(tags models.Tags) uint64 { return tags.TagsWithKeys(names).HashedID() @@ -107,6 +76,38 @@ func tagMap(t models.Tags) map[string]models.Tag { return m } +// Iff one of left or right is a time block, match match one to many +// against it, and match everything. +func defaultVectorMatcherBuilder(lhs, rhs block.Block) VectorMatching { + left := lhs.Info().BaseType() == block.BlockTime + right := rhs.Info().BaseType() == block.BlockTime + + if left { + if right { + return VectorMatching{ + Set: true, + Card: CardOneToOne, + } + } + + return VectorMatching{ + Set: true, + Card: CardOneToMany, + On: true, + } + } + + if right { + return VectorMatching{ + Set: true, + Card: CardManyToOne, + On: true, + } + } + + return VectorMatching{Set: false} +} + func combineMetaAndSeriesMeta( meta, otherMeta block.Metadata, seriesMeta, otherSeriesMeta []block.SeriesMeta, diff --git a/src/query/functions/binary/comparison.go b/src/query/functions/binary/comparison.go index 7f9ad17bc9..101d857d87 100644 --- a/src/query/functions/binary/comparison.go +++ b/src/query/functions/binary/comparison.go @@ -68,7 +68,7 @@ func toComparisonValue(b bool, x float64) float64 { } var ( - comparisonFuncs = map[string]Function{ + comparisonFuncs = map[string]binaryFunction{ EqType: func(x, y float64) float64 { return toComparisonValue(x == y, x) }, NotEqType: func(x, y float64) float64 { return toComparisonValue(x != y, x) }, GreaterType: func(x, y float64) float64 { return toComparisonValue(x > y, x) }, diff --git a/src/query/functions/binary/logical.go b/src/query/functions/binary/logical.go index 33e33e39b5..ddbf45b1d3 100644 --- a/src/query/functions/binary/logical.go +++ b/src/query/functions/binary/logical.go @@ -30,7 +30,7 @@ type makeBlockFn func( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) // Builds a logical processing function if able. If wrong opType supplied, @@ -61,7 +61,7 @@ func createLogicalProcessingStep( return func(queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { return processLogical(queryCtx, lhs, rhs, controller, - params.VectorMatching, fn) + params.VectorMatcherBuilder, fn) } } @@ -69,7 +69,7 @@ func processLogical( queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller, - matching *VectorMatching, + matcherBuilder VectorMatcherBuilder, makeBlock makeBlockFn, ) (block.Block, error) { lIter, err := lhs.StepIter() @@ -86,5 +86,6 @@ func processLogical( return nil, errMismatchedStepCounts } + matching := matcherBuilder(lhs, rhs) return makeBlock(queryCtx, lIter, rIter, controller, matching) } diff --git a/src/query/functions/binary/or.go b/src/query/functions/binary/or.go index d50d9bbeee..739467b3a9 100644 --- a/src/query/functions/binary/or.go +++ b/src/query/functions/binary/or.go @@ -36,8 +36,12 @@ func makeOrBlock( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + lMeta, lSeriesMetas := lIter.Meta(), lIter.SeriesMeta() lMeta, lSeriesMetas = removeNameTags(lMeta, lSeriesMetas) @@ -115,10 +119,10 @@ func makeOrBlock( // added after all lhs series have been added. // This function also combines the series metadatas for the entire block. func mergeIndices( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) ([]int, []block.SeriesMeta) { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the left-hand side. leftSigs := make(map[uint64]int, len(lhs)) for i, meta := range lhs { diff --git a/src/query/functions/binary/or_test.go b/src/query/functions/binary/or_test.go index 544e5df2d7..2dd9b2113b 100644 --- a/src/query/functions/binary/or_test.go +++ b/src/query/functions/binary/or_test.go @@ -45,9 +45,9 @@ func TestOrWithExactValues(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -76,9 +76,9 @@ func TestOrWithSomeValues(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -152,8 +152,7 @@ var indexMatchingTests = []struct { } func TestIndexMerging(t *testing.T) { - matching := &VectorMatching{} - + matching := VectorMatching{} for _, tt := range indexMatchingTests { t.Run(tt.name, func(t *testing.T) { matching, _ := mergeIndices(matching, tt.lhs, tt.rhs) @@ -263,9 +262,9 @@ func TestOrs(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -313,9 +312,9 @@ func TestOrsBoundsError(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) @@ -353,9 +352,9 @@ func TestOrCombinedMetadata(t *testing.T) { op, err := NewOp( OrType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/binary/types.go b/src/query/functions/binary/types.go new file mode 100644 index 0000000000..8f91102eaa --- /dev/null +++ b/src/query/functions/binary/types.go @@ -0,0 +1,69 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package binary + +import ( + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/parser" +) + +// VectorMatchCardinality describes the cardinality relationship +// of two Vectors in a binary operation. +type VectorMatchCardinality int + +const ( + // CardOneToOne is used for one-one relationship + CardOneToOne VectorMatchCardinality = iota + // CardManyToOne is used for many-one relationship + CardManyToOne + // CardOneToMany is used for one-many relationship + CardOneToMany + // CardManyToMany is used for many-many relationship + CardManyToMany +) + +// VectorMatching describes how elements from two Vectors in a binary +// operation are supposed to be matched. +type VectorMatching struct { + // Set determines if this has been set or is using default values. + Set bool + // Card is the cardinality of the two Vectors. + Card VectorMatchCardinality + // MatchingLabels contains the labels which define equality of a pair of + // elements from the Vectors. + MatchingLabels [][]byte + // On includes the given label names from matching, + // rather than excluding them. + On bool + // Include contains additional labels that should be included in + // the result from the side with the lower cardinality. + Include []string +} + +// VectorMatcherBuilder creates a vector matcher based on incoming block types. +type VectorMatcherBuilder func(lhs, rhs block.Block) VectorMatching + +// NodeParams describes the types of nodes used for binary operations. +type NodeParams struct { + LNode, RNode parser.NodeID + ReturnBool bool + VectorMatcherBuilder VectorMatcherBuilder +} diff --git a/src/query/functions/binary/unless.go b/src/query/functions/binary/unless.go index 35b4aa6229..62e3d80c3d 100644 --- a/src/query/functions/binary/unless.go +++ b/src/query/functions/binary/unless.go @@ -36,8 +36,12 @@ func makeUnlessBlock( queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, - matching *VectorMatching, + matching VectorMatching, ) (block.Block, error) { + if !matching.Set { + return nil, errNoMatching + } + lMeta, lSeriesMetas := lIter.Meta(), lIter.SeriesMeta() lMeta, lSeriesMetas = removeNameTags(lMeta, lSeriesMetas) @@ -100,10 +104,10 @@ func makeUnlessBlock( // matchingIndices returns a slice representing which index in the lhs the rhs // series maps to. If it does not map to an existing index, this is set to -1. func matchingIndices( - matching *VectorMatching, + matching VectorMatching, lhs, rhs []block.SeriesMeta, ) []indexMatcher { - idFunction := HashFunc(matching.On, matching.MatchingLabels...) + idFunction := hashFunc(matching.On, matching.MatchingLabels...) // The set of signatures for the left-hand side. leftSigs := make(map[uint64]int, len(lhs)) for idx, meta := range lhs { diff --git a/src/query/functions/binary/unless_test.go b/src/query/functions/binary/unless_test.go index f5fd8d366a..4dbfd3ce74 100644 --- a/src/query/functions/binary/unless_test.go +++ b/src/query/functions/binary/unless_test.go @@ -105,7 +105,7 @@ var distinctLeftTests = []struct { } func TestMatchingIndices(t *testing.T) { - matching := &VectorMatching{} + matching := VectorMatching{} for _, tt := range distinctLeftTests { t.Run(tt.name, func(t *testing.T) { excluded := matchingIndices(matching, tt.lhs, tt.rhs) @@ -213,9 +213,9 @@ func TestUnless(t *testing.T) { op, err := NewOp( UnlessType, NodeParams{ - LNode: parser.NodeID(0), - RNode: parser.NodeID(1), - VectorMatching: &VectorMatching{}, + LNode: parser.NodeID(0), + RNode: parser.NodeID(1), + VectorMatcherBuilder: emptyVectorMatcherBuilder, }, ) require.NoError(t, err) diff --git a/src/query/functions/scalar/base.go b/src/query/functions/scalar/scalar.go similarity index 75% rename from src/query/functions/scalar/base.go rename to src/query/functions/scalar/scalar.go index 2faf4402fb..6634db6fff 100644 --- a/src/query/functions/scalar/base.go +++ b/src/query/functions/scalar/scalar.go @@ -21,8 +21,6 @@ package scalar import ( - "fmt" - "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" @@ -46,63 +44,56 @@ const ( TimeType = "time" ) -type baseOp struct { - fn block.ScalarFunc - tagOptions models.TagOptions - operatorType string +type scalarOp struct { + val float64 + tagOptions models.TagOptions } -func (o baseOp) OpType() string { - return o.operatorType +func (o scalarOp) OpType() string { + return ScalarType } -func (o baseOp) String() string { - return fmt.Sprintf("type: %s.", o.OpType()) +func (o scalarOp) String() string { + return "type: scalar" } -func (o baseOp) Node( +func (o scalarOp) Node( controller *transform.Controller, opts transform.Options, ) parser.Source { - return &baseNode{ + return &scalarNode{ op: o, controller: controller, opts: opts, } } -// NewScalarOp creates a new scalar op. +// NewScalarOp creates an operation that yields a scalar source. func NewScalarOp( - fn block.ScalarFunc, - opType string, + val float64, tagOptions models.TagOptions, ) (parser.Params, error) { - if opType != ScalarType && opType != TimeType { - return nil, fmt.Errorf("unknown scalar type: %s", opType) - } - - return &baseOp{ - fn: fn, - tagOptions: tagOptions, - operatorType: opType, + return &scalarOp{ + val: val, + tagOptions: tagOptions, }, nil } -// scalarNode is the execution node -type baseNode struct { - op baseOp +// scalarNode is the execution node for time source. +type scalarNode struct { + op scalarOp controller *transform.Controller opts transform.Options } -// Execute runs the scalar node operation -func (n *baseNode) Execute(queryCtx *models.QueryContext) error { +// Execute runs the scalar source's pipeline. +func (n *scalarNode) Execute(queryCtx *models.QueryContext) error { meta := block.Metadata{ Bounds: n.opts.TimeSpec().Bounds(), Tags: models.NewTags(0, n.op.tagOptions), } - block := block.NewScalar(n.op.fn, meta) + block := block.NewScalar(n.op.val, meta) if n.opts.Debug() { // Ignore any errors iter, _ := block.StepIter() diff --git a/src/query/functions/scalar/base_test.go b/src/query/functions/scalar/scalar_test.go similarity index 82% rename from src/query/functions/scalar/base_test.go rename to src/query/functions/scalar/scalar_test.go index 368646351b..f355b2891d 100644 --- a/src/query/functions/scalar/base_test.go +++ b/src/query/functions/scalar/scalar_test.go @@ -22,7 +22,6 @@ package scalar import ( "testing" - "time" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" @@ -35,14 +34,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestScalarTime(t *testing.T) { +func TestScalar(t *testing.T) { + val := 10.0 _, bounds := test.GenerateValuesAndBounds(nil, nil) c, sink := executor.NewControllerWithSink(parser.NodeID(0)) - baseOp := baseOp{ - fn: func(t time.Time) float64 { return float64(t.Unix()) }, - operatorType: TimeType, - } + op, err := NewScalarOp(val, models.NewTagOptions()) + require.NoError(t, err) + baseOp, ok := op.(*scalarOp) + require.True(t, ok) start := bounds.Start step := bounds.StepSize node := baseOp.Node(c, transformtest.Options(t, transform.OptionsParams{ @@ -52,13 +52,14 @@ func TestScalarTime(t *testing.T) { Step: step, }, })) - err := node.Execute(models.NoopQueryContext()) + + err = node.Execute(models.NoopQueryContext()) require.NoError(t, err) - assert.Len(t, sink.Values, 1) + require.Equal(t, 1, len(sink.Values)) - for _, vals := range sink.Values { - for i, val := range vals { - assert.Equal(t, float64(start.Add(time.Duration(i)*step).Unix()), val) - } + vals := sink.Values[0] + assert.Equal(t, bounds.Steps(), len(vals)) + for _, v := range vals { + assert.Equal(t, val, v) } } diff --git a/src/query/functions/scalar/time.go b/src/query/functions/scalar/time.go new file mode 100644 index 0000000000..ab16ded7e6 --- /dev/null +++ b/src/query/functions/scalar/time.go @@ -0,0 +1,122 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scalar + +import ( + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/util/logging" + + "go.uber.org/zap" +) + +type timeOp struct { + tagOptions models.TagOptions +} + +func (o timeOp) OpType() string { + return TimeType +} + +func (o timeOp) String() string { + return "type: time" +} + +func (o timeOp) Node( + controller *transform.Controller, + opts transform.Options, +) parser.Source { + return &timeNode{ + controller: controller, + tagOptions: o.tagOptions, + opts: opts, + } +} + +// NewTimeOp creates an operation that yields a time-based source. +func NewTimeOp(tagOptions models.TagOptions) (parser.Params, error) { + return &timeOp{ + tagOptions: tagOptions, + }, nil +} + +// timeNode is the execution node for time source. +type timeNode struct { + tagOptions models.TagOptions + controller *transform.Controller + opts transform.Options +} + +// Execute runs the time source's pipeline. +func (n *timeNode) Execute(queryCtx *models.QueryContext) error { + bounds := n.opts.TimeSpec().Bounds() + meta := block.Metadata{ + Bounds: bounds, + Tags: models.NewTags(0, n.tagOptions), + } + + seriesMeta := []block.SeriesMeta{ + block.SeriesMeta{ + Tags: models.NewTags(0, n.tagOptions), + Name: []byte(TimeType), + }, + } + + builder := block.NewColumnBlockBuilder(queryCtx, meta, seriesMeta) + steps := bounds.Steps() + err := builder.AddCols(steps) + if err != nil { + return err + } + + for i := 0; i < steps; i++ { + t, err := bounds.TimeForIndex(i) + if err != nil { + return err + } + + timeVal := float64(t.Unix()) + if err := builder.AppendValue(i, timeVal); err != nil { + return err + } + } + + block := builder.BuildAsType(block.BlockTime) + if n.opts.Debug() { + // Ignore any errors + iter, _ := block.StepIter() + if iter != nil { + logging.WithContext(queryCtx.Ctx, n.opts.InstrumentOptions()). + Info("time node", zap.Any("meta", iter.Meta())) + } + } + + if err := n.controller.Process(queryCtx, block); err != nil { + block.Close() + // Fail on first error + return err + } + + block.Close() + return nil +} diff --git a/src/query/functions/scalar/time_test.go b/src/query/functions/scalar/time_test.go new file mode 100644 index 0000000000..7cfc3bc640 --- /dev/null +++ b/src/query/functions/scalar/time_test.go @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scalar + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/test/executor" + "github.com/m3db/m3/src/query/test/transformtest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTime(t *testing.T) { + _, bounds := test.GenerateValuesAndBounds(nil, nil) + c, sink := executor.NewControllerWithSink(parser.NodeID(0)) + op, err := NewTimeOp(models.NewTagOptions()) + require.NoError(t, err) + + baseOp, ok := op.(*timeOp) + require.True(t, ok) + start := bounds.Start + step := bounds.StepSize + node := baseOp.Node(c, transformtest.Options(t, transform.OptionsParams{ + TimeSpec: transform.TimeSpec{ + Start: start, + End: bounds.End(), + Step: step, + }, + })) + + err = node.Execute(models.NoopQueryContext()) + require.NoError(t, err) + assert.Len(t, sink.Values, 1) + assert.Equal(t, block.BlockTime, sink.Info.Type()) + + for i, vals := range sink.Values { + assert.Equal(t, float64(start.Add(time.Duration(i)*step).Unix()), vals[0]) + } +} diff --git a/src/query/parser/promql/parse.go b/src/query/parser/promql/parse.go index 358523e8ff..6bc9f9e28a 100644 --- a/src/query/parser/promql/parse.go +++ b/src/query/parser/promql/parse.go @@ -213,12 +213,7 @@ func (p *parseState) walk(node pql.Node) error { return err } - op, err := scalar.NewScalarOp( - func(_ time.Time) float64 { return val }, - scalar.ScalarType, - p.tagOpts, - ) - + op, err := scalar.NewScalarOp(val, p.tagOpts) if err != nil { return err } diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index dcbb1ff192..012b6eb8c7 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -22,8 +22,8 @@ package promql import ( "fmt" - "time" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/functions" "github.com/m3db/m3/src/query/functions/aggregation" "github.com/m3db/m3/src/query/functions/binary" @@ -145,25 +145,18 @@ func newScalarOperator( expr *promql.NumberLiteral, tagOpts models.TagOptions, ) (parser.Params, error) { - return scalar.NewScalarOp( - func(_ time.Time) float64 { return expr.Val }, - scalar.ScalarType, - tagOpts, - ) + return scalar.NewScalarOp(expr.Val, tagOpts) } // NewBinaryOperator creates a new binary operator based on the type. func NewBinaryOperator(expr *promql.BinaryExpr, lhs, rhs parser.NodeID) (parser.Params, error) { - matching := promMatchingToM3(expr.VectorMatching) - + matcherBuilder := promMatchingToM3(expr.VectorMatching) nodeParams := binary.NodeParams{ - LNode: lhs, - RNode: rhs, - LIsScalar: expr.LHS.Type() == promql.ValueTypeScalar, - RIsScalar: expr.RHS.Type() == promql.ValueTypeScalar, - ReturnBool: expr.ReturnBool, - VectorMatching: matching, + LNode: lhs, + RNode: rhs, + ReturnBool: expr.ReturnBool, + VectorMatcherBuilder: matcherBuilder, } op := getBinaryOpType(expr.Op) @@ -262,12 +255,7 @@ func NewFunctionExpr( return p, true, err case scalar.TimeType: - p, err = scalar.NewScalarOp( - func(t time.Time) float64 { return float64(t.Unix()) }, - scalar.TimeType, - tagOptions, - ) - + p, err = scalar.NewTimeOp(tagOptions) return p, true, err // NB: no-ops. @@ -406,7 +394,7 @@ func promVectorCardinalityToM3( func promMatchingToM3( vectorMatching *promql.VectorMatching, -) *binary.VectorMatching { +) binary.VectorMatcherBuilder { // vectorMatching can be nil iff at least one of the sides is a scalar. if vectorMatching == nil { return nil @@ -417,10 +405,13 @@ func promMatchingToM3( byteMatchers[i] = []byte(label) } - return &binary.VectorMatching{ - Card: promVectorCardinalityToM3(vectorMatching.Card), - MatchingLabels: byteMatchers, - On: vectorMatching.On, - Include: vectorMatching.Include, + return func(_, _ block.Block) binary.VectorMatching { + return binary.VectorMatching{ + Set: true, + Card: promVectorCardinalityToM3(vectorMatching.Card), + MatchingLabels: byteMatchers, + On: vectorMatching.On, + Include: vectorMatching.Include, + } } } diff --git a/src/query/storage/block.go b/src/query/storage/block.go index 6c713d3df6..02bd24a5b6 100644 --- a/src/query/storage/block.go +++ b/src/query/storage/block.go @@ -64,6 +64,10 @@ type multiBlockWrapper struct { unconsolidated block.UnconsolidatedBlock } +func (m *multiBlockWrapper) Info() block.BlockInfo { + return block.NewBlockInfo(block.BlockMultiSeries) +} + func (m *multiBlockWrapper) Unconsolidated() (block.UnconsolidatedBlock, error) { return m.unconsolidated, nil } diff --git a/src/query/storage/consolidated.go b/src/query/storage/consolidated.go index 196a4e0379..e46449ec3a 100644 --- a/src/query/storage/consolidated.go +++ b/src/query/storage/consolidated.go @@ -31,6 +31,10 @@ type consolidatedBlock struct { consolidationFunc block.ConsolidationFunc } +func (c *consolidatedBlock) Info() block.BlockInfo { + return block.NewBlockInfo(block.BlockConsolidated) +} + func (c *consolidatedBlock) Unconsolidated() (block.UnconsolidatedBlock, error) { return nil, errors.New("unconsolidated blocks are not supported") } diff --git a/src/query/storage/consolidated_test.go b/src/query/storage/consolidated_test.go index c32ba6e0ba..dc8fd2384e 100644 --- a/src/query/storage/consolidated_test.go +++ b/src/query/storage/consolidated_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" @@ -174,10 +176,11 @@ func TestConsolidation(t *testing.T) { unconsolidated, err := NewMultiSeriesBlock(seriesList, fetchQuery, time.Minute) assert.NoError(t, err) - block, err := unconsolidated.Consolidate() + bl, err := unconsolidated.Consolidate() assert.NoError(t, err) - iter, err := block.StepIter() + assert.Equal(t, block.BlockConsolidated, bl.Info().Type()) + iter, err := bl.StepIter() assert.NoError(t, err) i := 0 diff --git a/src/query/test/executor/transform.go b/src/query/test/executor/transform.go index e4a84404e6..2d2a66297b 100644 --- a/src/query/test/executor/transform.go +++ b/src/query/test/executor/transform.go @@ -46,6 +46,7 @@ type SinkNode struct { Values [][]float64 Meta block.Metadata Metas []block.SeriesMeta + Info block.BlockInfo } // Process processes and stores the last block output in the sink node @@ -77,6 +78,7 @@ func (s *SinkNode) Process(_ *models.QueryContext, ID parser.NodeID, block block } s.Meta = iter.Meta() + s.Info = block.Info() return nil } diff --git a/src/query/ts/m3db/encoded_block.go b/src/query/ts/m3db/encoded_block.go index e6b6e29e5c..20bbeb36c7 100644 --- a/src/query/ts/m3db/encoded_block.go +++ b/src/query/ts/m3db/encoded_block.go @@ -141,6 +141,10 @@ func (b *encodedBlock) generateMetas() error { return nil } +func (b *encodedBlock) Info() block.BlockInfo { + return block.NewBlockInfo(block.BlockM3TSZCompressed) +} + func (b *encodedBlock) WithMetadata( meta block.Metadata, seriesMetas []block.SeriesMeta, diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index 5cfaf10260..10e950063a 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -42,6 +42,7 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/pkg/profile" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -151,9 +152,10 @@ func testConsolidatedStepIteratorMinuteLookback(t *testing.T, withPools bool) { blocks, bounds := generateBlocks(t, tt.stepSize, opts) j := 0 - for i, block := range blocks { - iters, err := block.StepIter() + for i, bl := range blocks { + iters, err := bl.StepIter() require.NoError(t, err) + assert.Equal(t, block.BlockM3TSZCompressed, bl.Info().Type()) require.True(t, bounds.Equals(iters.Meta().Bounds)) verifyMetas(t, i, iters.Meta(), iters.SeriesMeta())