diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index fa052b6227..8ee9377f43 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -156,7 +156,8 @@ func (e *Engine) ExecuteExpr( result := state.resultNode results <- Query{Result: result} - if err := state.Execute(ctx); err != nil { + + if err := state.Execute(models.NewQueryContext(ctx, tally.NoopScope)); err != nil { result.abort(err) } else { result.done() diff --git a/src/query/executor/engine_test.go b/src/query/executor/engine_test.go index 7d547fa120..e68c75e42b 100644 --- a/src/query/executor/engine_test.go +++ b/src/query/executor/engine_test.go @@ -35,7 +35,7 @@ import ( "github.com/uber-go/tally" ) -func TestExecute(t *testing.T) { +func TestEngine_Execute(t *testing.T) { logging.InitWithCores(nil) ctrl := gomock.NewController(t) store, session := m3.NewStorageAndSession(t, ctrl) diff --git a/src/query/executor/result.go b/src/query/executor/result.go index d0637d132f..af65be0a23 100644 --- a/src/query/executor/result.go +++ b/src/query/executor/result.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/pkg/errors" @@ -64,7 +65,7 @@ func newResultNode() *ResultNode { } // Process the block -func (r *ResultNode) Process(ID parser.NodeID, block block.Block) error { +func (r *ResultNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { if r.aborted { return errAborted } diff --git a/src/query/executor/state.go b/src/query/executor/state.go index fba81d7fb7..f3e18f2dd9 100644 --- a/src/query/executor/state.go +++ b/src/query/executor/state.go @@ -25,6 +25,7 @@ import ( "fmt" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/plan" "github.com/m3db/m3/src/query/storage" @@ -184,13 +185,16 @@ func (s *ExecutionState) createNode( } // Execute the sources in parallel and return the first error -func (s *ExecutionState) Execute(ctx context.Context) error { +func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error { requests := make([]execution.Request, len(s.sources)) for idx, source := range s.sources { - requests[idx] = sourceRequest{source} + requests[idx] = sourceRequest{ + source: source, + queryCtx: queryCtx, + } } - return execution.ExecuteParallel(ctx, requests) + return execution.ExecuteParallel(queryCtx.Ctx, requests) } // String representation of the state @@ -199,9 +203,11 @@ func (s *ExecutionState) String() string { } type sourceRequest struct { - source parser.Source + source parser.Source + queryCtx *models.QueryContext } func (s sourceRequest) Process(ctx context.Context) error { - return s.source.Execute(ctx) + // make sure to propagate the new context.Context object down. + return s.source.Execute(s.queryCtx.WithContext(ctx)) } diff --git a/src/query/executor/state_test.go b/src/query/executor/state_test.go index 0f6908919c..be80009987 100644 --- a/src/query/executor/state_test.go +++ b/src/query/executor/state_test.go @@ -21,7 +21,6 @@ package executor import ( - "context" "testing" "time" @@ -61,7 +60,7 @@ func TestValidState(t *testing.T) { state, err := GenerateExecutionState(p, store) require.NoError(t, err) require.Len(t, state.sources, 1) - err = state.Execute(context.Background()) + err = state.Execute(models.NoopQueryContext()) assert.NoError(t, err) } diff --git a/src/query/executor/transform/controller.go b/src/query/executor/transform/controller.go index 8c97e6e0f1..7fdadbd8cf 100644 --- a/src/query/executor/transform/controller.go +++ b/src/query/executor/transform/controller.go @@ -22,6 +22,7 @@ package transform import ( "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -36,10 +37,10 @@ func (t *Controller) AddTransform(node OpNode) { t.transforms = append(t.transforms, node) } -// Process performs processing on the underlying transforms. -func (t *Controller) Process(block block.Block) error { +// Process performs processing on the underlying transforms +func (t *Controller) Process(queryCtx *models.QueryContext, block block.Block) error { for _, ts := range t.transforms { - err := ts.Process(t.ID, block) + err := ts.Process(queryCtx, t.ID, block) if err != nil { return err } @@ -48,11 +49,11 @@ func (t *Controller) Process(block block.Block) error { return nil } -// BlockBuilder returns a BlockBuilder instance with associated metadata. +// BlockBuilder returns a BlockBuilder instance with associated metadata func (t *Controller) BlockBuilder( + queryCtx *models.QueryContext, blockMeta block.Metadata, - seriesMeta []block.SeriesMeta, -) (block.Builder, error) { + seriesMeta []block.SeriesMeta) (block.Builder, error) { return block.NewColumnBlockBuilder(blockMeta, seriesMeta), nil } diff --git a/src/query/executor/transform/lazy.go b/src/query/executor/transform/lazy.go index 53ebe29ae5..888fb25151 100644 --- a/src/query/executor/transform/lazy.go +++ b/src/query/executor/transform/lazy.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -31,7 +32,7 @@ type sinkNode struct { block block.Block } -func (s *sinkNode) Process(ID parser.NodeID, block block.Block) error { +func (s *sinkNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { s.block = block return nil } @@ -44,9 +45,7 @@ type lazyNode struct { // NewLazyNode creates a new wrapper around a function fNode to make it support lazy initialization func NewLazyNode(node OpNode, controller *Controller) (OpNode, *Controller) { - c := &Controller{ - ID: controller.ID, - } + c := &Controller{ID: controller.ID} sink := &sinkNode{} controller.AddTransform(sink) @@ -58,14 +57,15 @@ func NewLazyNode(node OpNode, controller *Controller) (OpNode, *Controller) { }, c } -func (f *lazyNode) Process(ID parser.NodeID, block block.Block) error { +func (f *lazyNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { b := &lazyBlock{ rawBlock: block, lazyNode: f, + queryCtx: queryCtx, ID: ID, } - return f.controller.Process(b) + return f.controller.Process(queryCtx, b) } type stepIter struct { @@ -177,9 +177,11 @@ func (s *seriesIter) Next() bool { } type lazyBlock struct { - mu sync.Mutex - rawBlock block.Block - lazyNode *lazyNode + mu sync.Mutex + rawBlock block.Block + lazyNode *lazyNode + + queryCtx *models.QueryContext ID parser.NodeID processedBlock block.Block processError error @@ -283,7 +285,7 @@ func (f *lazyBlock) Close() error { } func (f *lazyBlock) process() error { - err := f.lazyNode.fNode.Process(f.ID, f.rawBlock) + err := f.lazyNode.fNode.Process(f.queryCtx, f.ID, f.rawBlock) if err != nil { f.processError = err return err diff --git a/src/query/executor/transform/lazy_test.go b/src/query/executor/transform/lazy_test.go index abb2e06a85..96db1b9c24 100644 --- a/src/query/executor/transform/lazy_test.go +++ b/src/query/executor/transform/lazy_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" @@ -35,9 +36,9 @@ type dummyFunc struct { controller *Controller } -func (f *dummyFunc) Process(ID parser.NodeID, block block.Block) error { +func (f *dummyFunc) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { f.processed = true - f.controller.Process(block) + f.controller.Process(queryCtx, block) return nil } @@ -50,7 +51,7 @@ func TestLazyState(t *testing.T) { downStreamController.AddTransform(sNode) values, bounds := test.GenerateValuesAndBounds(nil, nil) b := test.NewBlockFromValues(bounds, values) - err := node.Process(parser.NodeID(1), b) + err := node.Process(models.NoopQueryContext(), parser.NodeID(1), b) assert.NoError(t, err) assert.NotNil(t, sNode.block, "downstream process called with a block") assert.IsType(t, sNode.block, &lazyBlock{}) diff --git a/src/query/executor/transform/types.go b/src/query/executor/transform/types.go index eaa009cb1c..e6b6de07e2 100644 --- a/src/query/executor/transform/types.go +++ b/src/query/executor/transform/types.go @@ -37,7 +37,7 @@ type Options struct { // OpNode represents the execution node type OpNode interface { - Process(ID parser.NodeID, block block.Block) error + Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error } // TimeSpec defines the time bounds for the query execution. End is exclusive diff --git a/src/query/functions/aggregation/base.go b/src/query/functions/aggregation/base.go index 77c487ad6c..4abcf0639f 100644 --- a/src/query/functions/aggregation/base.go +++ b/src/query/functions/aggregation/base.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -109,7 +110,7 @@ type baseNode struct { } // Process the block -func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -126,7 +127,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { ) meta.Tags, metas = utils.DedupeMetadata(metas) - builder, err := n.controller.BlockBuilder(meta, metas) + builder, err := n.controller.BlockBuilder(queryCtx, meta, metas) if err != nil { return err } @@ -152,5 +153,5 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } diff --git a/src/query/functions/aggregation/base_test.go b/src/query/functions/aggregation/base_test.go index c2731cfae5..38642649f8 100644 --- a/src/query/functions/aggregation/base_test.go +++ b/src/query/functions/aggregation/base_test.go @@ -67,7 +67,7 @@ func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode { bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), bl) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink } diff --git a/src/query/functions/aggregation/count_values.go b/src/query/functions/aggregation/count_values.go index d2ecbdc355..518b11157a 100644 --- a/src/query/functions/aggregation/count_values.go +++ b/src/query/functions/aggregation/count_values.go @@ -140,7 +140,7 @@ func processBlockBucketAtColumn( } // Process the block -func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { +func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -210,7 +210,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { metaTags, flattenedMeta := utils.DedupeMetadata(blockMetas) meta.Tags = metaTags - builder, err := n.controller.BlockBuilder(meta, flattenedMeta) + builder, err := n.controller.BlockBuilder(queryCtx, meta, flattenedMeta) if err != nil { return err } @@ -231,7 +231,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } // pads vals with enough NaNs to match size diff --git a/src/query/functions/aggregation/count_values_test.go b/src/query/functions/aggregation/count_values_test.go index b74bcd1a8e..75f9a1277b 100644 --- a/src/query/functions/aggregation/count_values_test.go +++ b/src/query/functions/aggregation/count_values_test.go @@ -83,7 +83,7 @@ func processCountValuesOp( bl := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, vals) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(countValuesOp).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), bl) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink } diff --git a/src/query/functions/aggregation/take.go b/src/query/functions/aggregation/take.go index e8e735db83..49700b58dd 100644 --- a/src/query/functions/aggregation/take.go +++ b/src/query/functions/aggregation/take.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/ts" ) @@ -111,7 +112,7 @@ type takeNode struct { } // Process the block -func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { +func (n *takeNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -128,7 +129,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { ) // retain original metadatas - builder, err := n.controller.BlockBuilder(meta, stepIter.SeriesMeta()) + builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta()) if err != nil { return err } @@ -150,7 +151,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } // shortcut to return empty when taking <= 0 values diff --git a/src/query/functions/aggregation/take_test.go b/src/query/functions/aggregation/take_test.go index 801cee10a4..ddad9ff8d3 100644 --- a/src/query/functions/aggregation/take_test.go +++ b/src/query/functions/aggregation/take_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -62,7 +63,7 @@ func processTakeOp(t *testing.T, op parser.Params) *executor.SinkNode { bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(takeOp).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), bl) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink } diff --git a/src/query/functions/binary/and.go b/src/query/functions/binary/and.go index d62981d771..e14c823ebb 100644 --- a/src/query/functions/binary/and.go +++ b/src/query/functions/binary/and.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) // AndType uses values from left hand side for which there is a value in right hand side with exactly matching label sets. @@ -32,13 +33,14 @@ import ( const AndType = "and" func makeAndBlock( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, ) (block.Block, error) { lMeta, rSeriesMeta := lIter.Meta(), rIter.SeriesMeta() - builder, err := controller.BlockBuilder(lMeta, rSeriesMeta) + builder, err := controller.BlockBuilder(queryCtx, lMeta, rSeriesMeta) if err != nil { return nil, err } diff --git a/src/query/functions/binary/and_test.go b/src/query/functions/binary/and_test.go index 43d010780d..f4bf33fbda 100644 --- a/src/query/functions/binary/and_test.go +++ b/src/query/functions/binary/and_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -51,9 +52,9 @@ func TestAndWithExactValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Equal(t, values, sink.Values) } @@ -83,9 +84,9 @@ func TestAndWithSomeValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) // Most values same as lhs expected := values1 diff --git a/src/query/functions/binary/arithmetic.go b/src/query/functions/binary/arithmetic.go index 7bf4e88cc5..c020e64808 100644 --- a/src/query/functions/binary/arithmetic.go +++ b/src/query/functions/binary/arithmetic.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) const ( @@ -81,7 +82,16 @@ func buildArithmeticFunction( } // Build the binary processing step - return func(lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { - return processBinary(lhs, rhs, params, controller, false, fn) + return func( + queryCtx *models.QueryContext, + lhs, rhs block.Block, + controller *transform.Controller) (block.Block, error) { + return processBinary( + queryCtx, + lhs, rhs, + params, + controller, + false, + fn) }, true } diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index 3a06ae87b7..f8b36f537d 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -114,10 +115,10 @@ type baseNode struct { mu sync.Mutex } -type processFunc func(block.Block, block.Block, *transform.Controller) (block.Block, error) +type processFunc func(*models.QueryContext, block.Block, block.Block, *transform.Controller) (block.Block, error) // Process processes a block -func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { lhs, rhs, err := n.computeOrCache(ID, b) if err != nil { // Clean up any blocks from cache @@ -132,13 +133,13 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { n.cleanup() - nextBlock, err := n.process(lhs, rhs, n.controller) + nextBlock, err := n.process(queryCtx, lhs, rhs, n.controller) if err != nil { return err } defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } // computeOrCache figures out if both lhs and rhs are available, if not then it caches the incoming block diff --git a/src/query/functions/binary/binary.go b/src/query/functions/binary/binary.go index 67d427191d..65573623df 100644 --- a/src/query/functions/binary/binary.go +++ b/src/query/functions/binary/binary.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" ) // Function is a function that applies on two floats @@ -34,6 +35,7 @@ type singleScalarFunc func(x float64) float64 // processes two logical blocks, performing a logical operation on them func processBinary( + queryCtx *models.QueryContext, lhs, rhs block.Block, params NodeParams, controller *transform.Controller, @@ -56,6 +58,7 @@ func processBinary( // rhs is a series; use rhs metadata and series meta if !params.RIsScalar { return processSingleBlock( + queryCtx, rhs, controller, func(x float64) float64 { @@ -95,6 +98,7 @@ func processBinary( rVal := scalarR.Value(time.Time{}) // lhs is a series; use lhs metadata and series meta return processSingleBlock( + queryCtx, lhs, controller, func(x float64) float64 { @@ -116,10 +120,11 @@ func processBinary( return nil, errNoMatching } - return processBothSeries(lIter, rIter, controller, params.VectorMatching, fn) + return processBothSeries(queryCtx, lIter, rIter, controller, params.VectorMatching, fn) } func processSingleBlock( + queryCtx *models.QueryContext, block block.Block, controller *transform.Controller, fn singleScalarFunc, @@ -129,7 +134,7 @@ func processSingleBlock( return nil, err } - builder, err := controller.BlockBuilder(it.Meta(), it.SeriesMeta()) + builder, err := controller.BlockBuilder(queryCtx, it.Meta(), it.SeriesMeta()) if err != nil { return nil, err } @@ -154,6 +159,7 @@ func processSingleBlock( } func processBothSeries( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -173,7 +179,7 @@ func processBothSeries( lMeta.Tags, lSeriesMeta = utils.DedupeMetadata(lSeriesMeta) // Use metas from only taken left series - builder, err := controller.BlockBuilder(lMeta, lSeriesMeta) + builder, err := controller.BlockBuilder(queryCtx, lMeta, lSeriesMeta) if err != nil { return nil, err } diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 1ee5a783cc..81737f3cdc 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -116,10 +116,10 @@ func TestScalars(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) require.NoError(t, err) expected := [][]float64{{ @@ -160,10 +160,10 @@ func TestScalarsReturnBoolFalse(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) if tt.opType == EqType || tt.opType == NotEqType || tt.opType == GreaterType || tt.opType == LesserType || @@ -510,16 +510,16 @@ func TestSingleSeriesReturnBool(t *testing.T) { series := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, seriesValues) // Set the series and scalar blocks on the correct sides if tt.seriesLeft { - err = node.Process(parser.NodeID(0), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), series) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) } else { - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), series) require.NoError(t, err) } @@ -565,16 +565,16 @@ func TestSingleSeriesReturnValues(t *testing.T) { series := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, seriesValues) // Set the series and scalar blocks on the correct sides if tt.seriesLeft { - err = node.Process(parser.NodeID(0), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), series) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) } else { - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), series) require.NoError(t, err) } @@ -827,10 +827,10 @@ func TestBothSeries(t *testing.T) { StepSize: time.Minute, } - err = node.Process(parser.NodeID(0), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs)) require.NoError(t, err) test.EqualsWithNans(t, tt.expected, sink.Values) diff --git a/src/query/functions/binary/comparison.go b/src/query/functions/binary/comparison.go index f2400e99cc..b351b6aa9f 100644 --- a/src/query/functions/binary/comparison.go +++ b/src/query/functions/binary/comparison.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) const ( @@ -98,7 +99,16 @@ func buildComparisonFunction( return nil, false } - return func(lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { - return processBinary(lhs, rhs, params, controller, true, fn) + return func( + queryCtx *models.QueryContext, + lhs, rhs block.Block, + controller *transform.Controller) (block.Block, error) { + return processBinary( + queryCtx, + lhs, rhs, + params, + controller, + true, + fn) }, true } diff --git a/src/query/functions/binary/logical.go b/src/query/functions/binary/logical.go index fae12a2873..02b3616357 100644 --- a/src/query/functions/binary/logical.go +++ b/src/query/functions/binary/logical.go @@ -23,9 +23,11 @@ package binary import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) type makeBlockFn func( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -55,13 +57,14 @@ func buildLogicalFunction( func createLogicalProcessingStep( params NodeParams, fn makeBlockFn, -) func(block.Block, block.Block, *transform.Controller) (block.Block, error) { - return func(lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { - return processLogical(lhs, rhs, controller, params.VectorMatching, fn) +) processFunc { + return func(queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { + return processLogical(queryCtx, lhs, rhs, controller, params.VectorMatching, fn) } } func processLogical( + queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller, matching *VectorMatching, @@ -81,5 +84,5 @@ func processLogical( return nil, errMismatchedStepCounts } - return makeBlock(lIter, rIter, controller, matching) + return makeBlock(queryCtx, lIter, rIter, controller, matching) } diff --git a/src/query/functions/binary/or.go b/src/query/functions/binary/or.go index 4db009e153..ab38e88820 100644 --- a/src/query/functions/binary/or.go +++ b/src/query/functions/binary/or.go @@ -23,6 +23,7 @@ package binary import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) // OrType uses all values from left hand side, and appends values from the right hand side which do @@ -30,6 +31,7 @@ import ( const OrType = "or" func makeOrBlock( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -50,7 +52,7 @@ func makeOrBlock( rSeriesMetas, ) - builder, err := controller.BlockBuilder(meta, combinedSeriesMeta) + builder, err := controller.BlockBuilder(queryCtx, meta, combinedSeriesMeta) if err != nil { return nil, err } diff --git a/src/query/functions/binary/or_test.go b/src/query/functions/binary/or_test.go index 623eb6059d..ff43c7921b 100644 --- a/src/query/functions/binary/or_test.go +++ b/src/query/functions/binary/or_test.go @@ -55,9 +55,9 @@ func TestOrWithExactValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Equal(t, values, sink.Values) } @@ -86,9 +86,9 @@ func TestOrWithSomeValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) // NAN values should be filled expected := values1 @@ -230,7 +230,7 @@ func TestOrs(t *testing.T) { } lhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) bounds = models.Bounds{ @@ -240,7 +240,7 @@ func TestOrs(t *testing.T) { } rhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) if tt.err != nil { require.EqualError(t, err, tt.err.Error()) return @@ -275,7 +275,7 @@ func TestOrsBoundsError(t *testing.T) { node := op.(baseOp).Node(c, transform.Options{}) lhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) differentBounds := models.Bounds{ @@ -284,7 +284,7 @@ func TestOrsBoundsError(t *testing.T) { StepSize: bounds.StepSize, } rhs := test.NewBlockFromValuesWithSeriesMeta(differentBounds, tt.rhsMeta, tt.rhs) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) require.EqualError(t, err, errMismatchedBounds.Error()) } @@ -327,7 +327,7 @@ func TestOrCombinedMetadata(t *testing.T) { lSeriesMeta, [][]float64{{1, 2}, {10, 20}}) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) strTags = test.StringTags{{"a", "b"}, {"c", "*d"}, {"g", "h"}} @@ -345,7 +345,7 @@ func TestOrCombinedMetadata(t *testing.T) { rSeriesMeta, [][]float64{{3, 4}, {30, 40}}) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) require.NoError(t, err) test.EqualsWithNans(t, [][]float64{{1, 2}, {10, 20}, {3, 4}, {30, 40}}, sink.Values) diff --git a/src/query/functions/binary/unless.go b/src/query/functions/binary/unless.go index b8a5e983fe..008b9096c0 100644 --- a/src/query/functions/binary/unless.go +++ b/src/query/functions/binary/unless.go @@ -26,12 +26,14 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" ) // UnlessType uses all values from lhs which do not exist in rhs const UnlessType = "unless" func makeUnlessBlock( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -52,7 +54,7 @@ func makeUnlessBlock( meta := lIter.Meta() commonTags, dedupedSeriesMetas := utils.DedupeMetadata(distinctSeriesMeta) meta.Tags = commonTags - builder, err := controller.BlockBuilder(meta, dedupedSeriesMetas) + builder, err := controller.BlockBuilder(queryCtx, meta, dedupedSeriesMetas) if err != nil { return nil, err } diff --git a/src/query/functions/binary/unless_test.go b/src/query/functions/binary/unless_test.go index 75a713181c..b120156f82 100644 --- a/src/query/functions/binary/unless_test.go +++ b/src/query/functions/binary/unless_test.go @@ -222,7 +222,7 @@ func TestUnless(t *testing.T) { } lhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) bounds = models.Bounds{ Start: now, @@ -231,7 +231,7 @@ func TestUnless(t *testing.T) { } rhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) if tt.err != nil { require.EqualError(t, err, tt.err.Error()) return diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index cf79afa40f..b5f04ca796 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -21,7 +21,6 @@ package functions import ( - "context" "fmt" "time" @@ -88,13 +87,16 @@ func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage, } // Execute runs the fetch node operation -func (n *FetchNode) Execute(ctx context.Context) error { +func (n *FetchNode) Execute(queryCtx *models.QueryContext) error { timeSpec := n.timespec // No need to adjust start and ends since physical plan already considers the offset, range startTime := timeSpec.Start endTime := timeSpec.End opts := storage.NewFetchOptions() opts.BlockType = n.blockType + + ctx := queryCtx.Ctx + blockResult, err := n.storage.FetchBlocks(ctx, &storage.FetchQuery{ Start: startTime, End: endTime, @@ -114,7 +116,7 @@ func (n *FetchNode) Execute(ctx context.Context) error { } } - if err := n.controller.Process(block); err != nil { + if err := n.controller.Process(queryCtx, block); err != nil { block.Close() // Fail on first error return err diff --git a/src/query/functions/fetch_test.go b/src/query/functions/fetch_test.go index 805a5925ca..df40626744 100644 --- a/src/query/functions/fetch_test.go +++ b/src/query/functions/fetch_test.go @@ -21,11 +21,11 @@ package functions import ( - "context" "testing" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage/mock" "github.com/m3db/m3/src/query/test" @@ -42,7 +42,7 @@ func TestFetch(t *testing.T) { mockStorage := mock.NewMockStorage() mockStorage.SetFetchBlocksResult(block.Result{Blocks: []block.Block{b}}, nil) source := (&FetchOp{}).Node(c, mockStorage, transform.Options{}) - err := source.Execute(context.TODO()) + err := source.Execute(models.NoopQueryContext()) require.NoError(t, err) expected := values assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/absent_test.go b/src/query/functions/linear/absent_test.go index 623bf2b16d..530087d237 100644 --- a/src/query/functions/linear/absent_test.go +++ b/src/query/functions/linear/absent_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -42,7 +43,7 @@ func TestAbsentWithValues(t *testing.T) { block := test.NewBlockFromValues(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := NewAbsentOp().Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) expected := [][]float64{ @@ -62,7 +63,7 @@ func TestAbsentWithNoValues(t *testing.T) { block := test.NewBlockFromValues(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := NewAbsentOp().Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) assert.Equal(t, [][]float64{{1, 1, 1, 1, 1}, {1, 1, 1, 1, 1}}, sink.Values) diff --git a/src/query/functions/linear/base.go b/src/query/functions/linear/base.go index b07196ab18..437adecea5 100644 --- a/src/query/functions/linear/base.go +++ b/src/query/functions/linear/base.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -80,13 +81,13 @@ func (c *baseNode) ProcessSeries(series block.Series) (block.Series, error) { } // Process the block -func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err } - builder, err := c.controller.BlockBuilder(stepIter.Meta(), stepIter.SeriesMeta()) + builder, err := c.controller.BlockBuilder(queryCtx, stepIter.Meta(), stepIter.SeriesMeta()) if err != nil { return err } @@ -109,7 +110,7 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return c.controller.Process(nextBlock) + return c.controller.Process(queryCtx, nextBlock) } // Meta returns the metadata for the block diff --git a/src/query/functions/linear/clamp_test.go b/src/query/functions/linear/clamp_test.go index 351ead30b9..088b1cea63 100644 --- a/src/query/functions/linear/clamp_test.go +++ b/src/query/functions/linear/clamp_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -56,7 +57,7 @@ func TestClampMin(t *testing.T) { op, err := NewClampOp([]interface{}{3.0}, ClampMinType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedClampVals(values, 3.0, math.Max) assert.Len(t, sink.Values, 2) @@ -72,7 +73,7 @@ func TestClampMax(t *testing.T) { op, err := NewClampOp([]interface{}{3.0}, ClampMaxType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedClampVals(values, 3.0, math.Min) assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/datetime_test.go b/src/query/functions/linear/datetime_test.go index a8b4edca53..7c3e6a9471 100644 --- a/src/query/functions/linear/datetime_test.go +++ b/src/query/functions/linear/datetime_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -64,7 +65,7 @@ func TestDayOfMonth(t *testing.T) { op, err := NewDateOp(DayOfMonthType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[DayOfMonthType]) assert.Len(t, sink.Values, 2) @@ -83,7 +84,7 @@ func TestDayOfWeek(t *testing.T) { op, err := NewDateOp(DayOfWeekType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[DayOfWeekType]) assert.Len(t, sink.Values, 2) @@ -102,7 +103,7 @@ func TestDaysInMonth(t *testing.T) { op, err := NewDateOp(DaysInMonthType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[DaysInMonthType]) assert.Len(t, sink.Values, 2) @@ -121,7 +122,7 @@ func TestHour(t *testing.T) { op, err := NewDateOp(HourType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[HourType]) assert.Len(t, sink.Values, 2) @@ -140,7 +141,7 @@ func TestMinute(t *testing.T) { op, err := NewDateOp(MinuteType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[MinuteType]) assert.Len(t, sink.Values, 2) @@ -159,7 +160,7 @@ func TestMonth(t *testing.T) { op, err := NewDateOp(MonthType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[MonthType]) assert.Len(t, sink.Values, 2) @@ -178,7 +179,7 @@ func TestYear(t *testing.T) { op, err := NewDateOp(YearType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[YearType]) assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index 377c9365fc..813adfea7b 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -233,7 +233,7 @@ func bucketQuantile(q float64, buckets []bucketValue) float64 { } // Process the block -func (n *histogramQuantileNode) Process(ID parser.NodeID, b block.Block) error { +func (n *histogramQuantileNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -245,13 +245,14 @@ func (n *histogramQuantileNode) Process(ID parser.NodeID, b block.Block) error { q := n.op.q if q < 0 || q > 1 { - return processInvalidQuantile(q, bucketedSeries, meta, stepIter, n.controller) + return processInvalidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller) } - return processValidQuantile(q, bucketedSeries, meta, stepIter, n.controller) + return processValidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller) } func setupBuilder( + queryCtx *models.QueryContext, bucketedSeries bucketedSeries, meta block.Metadata, stepIter block.StepIter, @@ -268,7 +269,7 @@ func setupBuilder( } meta.Tags, metas = utils.DedupeMetadata(metas) - builder, err := controller.BlockBuilder(meta, metas) + builder, err := controller.BlockBuilder(queryCtx, meta, metas) if err != nil { return nil, err } @@ -281,6 +282,7 @@ func setupBuilder( } func processValidQuantile( + queryCtx *models.QueryContext, q float64, bucketedSeries bucketedSeries, meta block.Metadata, @@ -289,7 +291,7 @@ func processValidQuantile( ) error { sanitizeBuckets(bucketedSeries) - builder, err := setupBuilder(bucketedSeries, meta, stepIter, controller) + builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) if err != nil { return err } @@ -331,17 +333,18 @@ func processValidQuantile( nextBlock := builder.Build() defer nextBlock.Close() - return controller.Process(nextBlock) + return controller.Process(queryCtx, nextBlock) } func processInvalidQuantile( + queryCtx *models.QueryContext, q float64, bucketedSeries bucketedSeries, meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, ) error { - builder, err := setupBuilder(bucketedSeries, meta, stepIter, controller) + builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) if err != nil { return err } @@ -366,5 +369,5 @@ func processInvalidQuantile( nextBlock := builder.Build() defer nextBlock.Close() - return controller.Process(nextBlock) + return controller.Process(queryCtx, nextBlock) } diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go index 714623b3cf..99a3e0695c 100644 --- a/src/query/functions/linear/histogram_quantile_test.go +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -275,7 +275,7 @@ func testQuantileFunctionWithQ(t *testing.T, q float64) [][]float64 { bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(histogramQuantileOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), bl) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink.Values diff --git a/src/query/functions/linear/math_test.go b/src/query/functions/linear/math_test.go index 49f24d11ea..a5a7285118 100644 --- a/src/query/functions/linear/math_test.go +++ b/src/query/functions/linear/math_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -56,7 +57,7 @@ func TestAbsWithAllValues(t *testing.T) { op, err := NewMathOp(AbsType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Abs) assert.Len(t, sink.Values, 2) @@ -75,7 +76,7 @@ func TestAbsWithSomeValues(t *testing.T) { op, err := NewMathOp(AbsType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Abs) assert.Len(t, sink.Values, 2) @@ -90,7 +91,7 @@ func TestLn(t *testing.T) { op, err := NewMathOp(LnType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Log) assert.Len(t, sink.Values, 2) @@ -109,7 +110,7 @@ func TestLog10WithNoValues(t *testing.T) { op, err := NewMathOp(Log10Type) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Log10) assert.Len(t, sink.Values, 2) @@ -128,7 +129,7 @@ func TestLog2WithSomeValues(t *testing.T) { op, err := NewMathOp(Log2Type) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Log2) assert.Len(t, sink.Values, 2) @@ -147,7 +148,7 @@ func TestFloorWithSomeValues(t *testing.T) { op, err := NewMathOp(FloorType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Floor) assert.Len(t, sink.Values, 2) @@ -166,7 +167,7 @@ func TestCeilWithSomeValues(t *testing.T) { op, err := NewMathOp(CeilType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Ceil) assert.Len(t, sink.Values, 2) @@ -184,7 +185,7 @@ func TestExpWithSomeValues(t *testing.T) { op, err := NewMathOp(ExpType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Exp) assert.Len(t, sink.Values, 2) @@ -203,7 +204,7 @@ func TestSqrtWithSomeValues(t *testing.T) { op, err := NewMathOp(SqrtType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Sqrt) assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/round_test.go b/src/query/functions/linear/round_test.go index 97b14833a0..8b6068413c 100644 --- a/src/query/functions/linear/round_test.go +++ b/src/query/functions/linear/round_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -44,7 +45,7 @@ func TestRoundWithSomeValues(t *testing.T) { op, err := NewRoundOp([]interface{}{10.0}) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := [][]float64{ diff --git a/src/query/functions/scalar/base.go b/src/query/functions/scalar/base.go index e39a67f7e3..e95ea807a7 100644 --- a/src/query/functions/scalar/base.go +++ b/src/query/functions/scalar/base.go @@ -21,11 +21,11 @@ package scalar import ( - "context" "fmt" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/util/logging" @@ -86,7 +86,7 @@ type baseNode struct { } // Execute runs the scalar node operation -func (n *baseNode) Execute(ctx context.Context) error { +func (n *baseNode) Execute(queryCtx *models.QueryContext) error { bounds := n.timespec.Bounds() block := block.NewScalar(n.op.fn, bounds) @@ -94,11 +94,11 @@ func (n *baseNode) Execute(ctx context.Context) error { // Ignore any errors iter, _ := block.StepIter() if iter != nil { - logging.WithContext(ctx).Info("scalar node", zap.Any("meta", iter.Meta())) + logging.WithContext(queryCtx.Ctx).Info("scalar node", zap.Any("meta", iter.Meta())) } } - if err := n.controller.Process(block); err != nil { + if err := n.controller.Process(queryCtx, block); err != nil { block.Close() // Fail on first error return err diff --git a/src/query/functions/scalar/base_test.go b/src/query/functions/scalar/base_test.go index c3c1d8c004..3b5c670ce7 100644 --- a/src/query/functions/scalar/base_test.go +++ b/src/query/functions/scalar/base_test.go @@ -21,11 +21,11 @@ package scalar import ( - "context" "testing" "time" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -51,7 +51,7 @@ func TestScalarTime(t *testing.T) { Step: step, }, }) - err := node.Execute(context.Background()) + err := node.Execute(models.NoopQueryContext()) require.NoError(t, err) assert.Len(t, sink.Values, 1) diff --git a/src/query/functions/tag/base.go b/src/query/functions/tag/base.go index 881cbdb328..3cde9c0140 100644 --- a/src/query/functions/tag/base.go +++ b/src/query/functions/tag/base.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -97,7 +98,7 @@ type baseNode struct { } // Process the block. -func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { it, err := b.StepIter() if err != nil { return err @@ -113,5 +114,5 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { } defer bl.Close() - return n.controller.Process(bl) + return n.controller.Process(queryCtx, bl) } diff --git a/src/query/functions/tag/join_test.go b/src/query/functions/tag/join_test.go index eca49124d5..d6982e39f4 100644 --- a/src/query/functions/tag/join_test.go +++ b/src/query/functions/tag/join_test.go @@ -187,7 +187,7 @@ func TestTagJoinOp(t *testing.T) { bl := block.NewColumnBlockBuilder(meta, seriesMeta).Build() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), bl) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) assert.Equal(t, test.StringTagsToTags(tt.expectedMetaTags), sink.Meta.Tags) diff --git a/src/query/functions/tag/replace_test.go b/src/query/functions/tag/replace_test.go index 4c45fee2dc..b82d76cdf0 100644 --- a/src/query/functions/tag/replace_test.go +++ b/src/query/functions/tag/replace_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -164,7 +165,7 @@ func TestTagReplaceOp(t *testing.T) { bl := block.NewColumnBlockBuilder(meta, seriesMeta).Build() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), bl) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) assert.Equal(t, test.StringTagsToTags(tt.expectedMetaTags), sink.Meta.Tags) diff --git a/src/query/functions/temporal/aggregation_test.go b/src/query/functions/temporal/aggregation_test.go index 544f258ba3..1421e6fc3c 100644 --- a/src/query/functions/temporal/aggregation_test.go +++ b/src/query/functions/temporal/aggregation_test.go @@ -290,7 +290,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -307,7 +307,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -323,7 +323,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index e0c59cdeeb..a78611b32f 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -93,7 +93,7 @@ type baseNode struct { // 4. Process all valid blocks from #3, #4 and mark them as processed // 5. Run a sweep phase to free up blocks which are no longer needed to be cached // TODO: Figure out if something else needs to be locked -func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { unconsolidatedBlock, err := b.Unconsolidated() if err != nil { return err @@ -152,7 +152,12 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { processRequests := make([]processRequest, 0, len(leftBlks)) // If we have all blocks for the left range in the cache, then process the current block if !emptyLeftBlocks { - processRequests = append(processRequests, processRequest{blk: unconsolidatedBlock, deps: leftBlks, bounds: bounds}) + processRequests = append(processRequests, processRequest{ + blk: unconsolidatedBlock, + deps: leftBlks, + bounds: bounds, + queryCtx: queryCtx, + }) } leftBlks = append(leftBlks, unconsolidatedBlock) @@ -171,7 +176,13 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { deps := leftBlks[len(leftBlks)-lStart:] deps = append(deps, rightBlks[:i]...) - processRequests = append(processRequests, processRequest{blk: rightBlks[i], deps: deps, bounds: bounds.Next(i + 1)}) + processRequests = append( + processRequests, + processRequest{ + blk: rightBlks[i], + deps: deps, + bounds: bounds.Next(i + 1), + queryCtx: queryCtx}) } // If either the left range or right range wasn't fully processed then cache the current block @@ -255,7 +266,7 @@ func (c *baseNode) processSingleRequest(request processRequest) error { } } - builder, err := c.controller.BlockBuilder(seriesIter.Meta(), resultSeriesMeta) + builder, err := c.controller.BlockBuilder(request.queryCtx, seriesIter.Meta(), resultSeriesMeta) if err != nil { return err } @@ -319,7 +330,7 @@ func (c *baseNode) processSingleRequest(request processRequest) error { nextBlock := builder.Build() defer nextBlock.Close() - return c.controller.Process(nextBlock) + return c.controller.Process(request.queryCtx, nextBlock) } func (c *baseNode) sweep(processedKeys []bool, maxBlocks int) { @@ -359,9 +370,10 @@ type MakeProcessor interface { } type processRequest struct { - blk block.UnconsolidatedBlock - bounds models.Bounds - deps []block.UnconsolidatedBlock + queryCtx *models.QueryContext + blk block.UnconsolidatedBlock + bounds models.Bounds + deps []block.UnconsolidatedBlock } // blockCache keeps track of blocks from the same parent across time diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index c936850271..785187cb05 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -81,7 +81,7 @@ func TestBaseWithB0(t *testing.T) { Step: time.Second, }, }) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) require.IsType(t, node, &baseNode{}) @@ -98,7 +98,7 @@ func TestBaseWithB0(t *testing.T) { }, }) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) bNode = node.(*baseNode) _, exists = bNode.cache.get(boundStart) @@ -113,7 +113,7 @@ func TestBaseWithB0(t *testing.T) { }, }) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) bNode = node.(*baseNode) _, exists = bNode.cache.get(boundStart) @@ -123,12 +123,12 @@ func TestBaseWithB0(t *testing.T) { func TestBaseWithB1B0(t *testing.T) { tc := setup(2, 5*time.Minute, 1) - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "nothing processed yet") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true}, "B1 cached") - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "output from both blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false}, "everything removed from cache") @@ -140,13 +140,12 @@ func TestBaseWithB1B0(t *testing.T) { func TestBaseWithB0B1(t *testing.T) { tc := setup(2, 5*time.Minute, 1) - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) - + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false}, "B0 cached for future") - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "output from both blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false}, "B0 removed from cache, B1 not cached") @@ -159,19 +158,19 @@ func TestBaseWithB0B1B2(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B0 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false}, "B0 cached for future") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "output from B0, B1") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B0 removed from cache, B1 cached") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -181,19 +180,19 @@ func TestBaseWithB0B2B1(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B0 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false}, "B0 cached for future") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "Only B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, true}, "B0, B2 cached") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -203,19 +202,19 @@ func TestBaseWithB1B0B2(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B1 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached for future") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "B0, B1 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 still cached, B0 not cached") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -225,19 +224,19 @@ func TestBaseWithB1B2B0(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B1 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached for future") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B1 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 still cached, B2 not cached") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -247,19 +246,19 @@ func TestBaseWithB2B0B1(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B2 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, true}, "B2 cached for future") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, true}, "B0, B2 cached") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -269,19 +268,19 @@ func TestBaseWithB2B1B0(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B2 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, true}, "B2 cached for future") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached, B2 removed") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -291,31 +290,31 @@ func TestBaseWithSize3B0B1B2B3B4(t *testing.T) { tc := setup(5, 15*time.Minute, 4) // B0 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false, false, false}, "B0 cached for future") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "B0, B1 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, true, false, false, false}, "B0, B1 cached") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "B0, B1, B2 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, true, true, false, false}, "B0, B1, B2 cached") // B3 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[3]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[3]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 8, "B0, B1, B2, B3 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, true, true, false}, "B0 removed, B1, B2, B3 cached") // B4 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[4]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[4]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 10, "all 5 blocks processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false, false, false}, "nothing cached") @@ -397,7 +396,12 @@ func TestSingleProcessRequest(t *testing.T) { }, }) bNode := node.(*baseNode) - err := bNode.processSingleRequest(processRequest{blk: block2, bounds: bounds, deps: []block.UnconsolidatedBlock{block1}}) + err := bNode.processSingleRequest(processRequest{ + blk: block2, + bounds: bounds, + deps: []block.UnconsolidatedBlock{block1}, + queryCtx: models.NoopQueryContext(), + }) assert.NoError(t, err) assert.Len(t, sink.Values, 2, "block processed") // Current Block: 0 1 2 3 4 5 diff --git a/src/query/functions/temporal/functions_test.go b/src/query/functions/temporal/functions_test.go index d59dfd43ff..46192302cf 100644 --- a/src/query/functions/temporal/functions_test.go +++ b/src/query/functions/temporal/functions_test.go @@ -157,7 +157,7 @@ func testTemporalFunc(t *testing.T, testCases []testCase) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -174,7 +174,7 @@ func testTemporalFunc(t *testing.T, testCases []testCase) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -190,7 +190,7 @@ func testTemporalFunc(t *testing.T, testCases []testCase) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/holt_winters_test.go b/src/query/functions/temporal/holt_winters_test.go index 480f267ebd..b85964305b 100644 --- a/src/query/functions/temporal/holt_winters_test.go +++ b/src/query/functions/temporal/holt_winters_test.go @@ -96,7 +96,7 @@ func testHoltWinters(t *testing.T, testCases []testCase, vals [][]float64) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -113,7 +113,7 @@ func testHoltWinters(t *testing.T, testCases []testCase, vals [][]float64) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -129,7 +129,7 @@ func testHoltWinters(t *testing.T, testCases []testCase, vals [][]float64) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/linear_regression_test.go b/src/query/functions/temporal/linear_regression_test.go index 028f65cac5..fd2bc25426 100644 --- a/src/query/functions/temporal/linear_regression_test.go +++ b/src/query/functions/temporal/linear_regression_test.go @@ -170,7 +170,7 @@ func testLinearRegression(t *testing.T, testCases []testCase, vals [][]float64) }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -187,7 +187,7 @@ func testLinearRegression(t *testing.T, testCases []testCase, vals [][]float64) }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -203,7 +203,7 @@ func testLinearRegression(t *testing.T, testCases []testCase, vals [][]float64) StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/rate_test.go b/src/query/functions/temporal/rate_test.go index fe7c3552eb..59e7d8c411 100644 --- a/src/query/functions/temporal/rate_test.go +++ b/src/query/functions/temporal/rate_test.go @@ -323,7 +323,7 @@ func testRate(t *testing.T, testCases []testRateCase) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -340,7 +340,7 @@ func testRate(t *testing.T, testCases []testRateCase) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -356,7 +356,7 @@ func testRate(t *testing.T, testCases []testRateCase) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/unconsolidated/timestamp.go b/src/query/functions/unconsolidated/timestamp.go index 359e5926ed..052bc47385 100644 --- a/src/query/functions/unconsolidated/timestamp.go +++ b/src/query/functions/unconsolidated/timestamp.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/ts" ) @@ -83,7 +84,7 @@ type timestampNode struct { } // Process the block -func (n *timestampNode) Process(ID parser.NodeID, b block.Block) error { +func (n *timestampNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { unconsolidatedBlock, err := b.Unconsolidated() if err != nil { return err @@ -94,7 +95,7 @@ func (n *timestampNode) Process(ID parser.NodeID, b block.Block) error { return err } - builder, err := n.controller.BlockBuilder(iter.Meta(), iter.SeriesMeta()) + builder, err := n.controller.BlockBuilder(queryCtx, iter.Meta(), iter.SeriesMeta()) if err != nil { return err } @@ -127,5 +128,5 @@ func (n *timestampNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } diff --git a/src/query/functions/unconsolidated/timestamp_test.go b/src/query/functions/unconsolidated/timestamp_test.go index 08f1666c6a..0c3d536fc6 100644 --- a/src/query/functions/unconsolidated/timestamp_test.go +++ b/src/query/functions/unconsolidated/timestamp_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -38,7 +39,7 @@ func TestTimestamp(t *testing.T) { block := test.NewUnconsolidatedBlockFromDatapoints(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := newTimestampOp(TimestampType).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) diff --git a/src/query/models/query_context.go b/src/query/models/query_context.go new file mode 100644 index 0000000000..5da6e50dd7 --- /dev/null +++ b/src/query/models/query_context.go @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "context" + + "github.com/uber-go/tally" +) + +// QueryContext provides all external state needed to execute and track a query. +// It acts as a hook back into the execution engine for things like +// cost accounting. +type QueryContext struct { + Ctx context.Context + Scope tally.Scope +} + +// NewQueryContext constructs a QueryContext using the given Enforcer to +// enforce per query limits. +func NewQueryContext(ctx context.Context, scope tally.Scope) *QueryContext { + return &QueryContext{ + Ctx: ctx, + Scope: scope, + } +} + +// NoopQueryContext returns a query context with no active components. +func NoopQueryContext() *QueryContext { + return NewQueryContext(context.Background(), tally.NoopScope) +} + +// WithContext creates a shallow copy of this QueryContext using the new context. +// Sample usage: +// +// ctx, cancel := context.WithTimeout(qc.Ctx, 5*time.Second) +// defer cancel() +// qc = qc.WithContext(ctx) +func (qc *QueryContext) WithContext(ctx context.Context) *QueryContext { + if qc == nil { + return nil + } + + clone := *qc + clone.Ctx = ctx + return &clone +} diff --git a/src/query/models/query_context_test.go b/src/query/models/query_context_test.go new file mode 100644 index 0000000000..5cea7f659b --- /dev/null +++ b/src/query/models/query_context_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueryContext_WithContext(t *testing.T) { + t.Run("passes along new context without modifying old", func(t *testing.T) { + qc := NoopQueryContext() + + testKey := struct{}{} + + newCtx := context.WithValue(qc.Ctx, testKey, "bar") + + newQc := qc.WithContext(newCtx) + + assert.Equal(t, "bar", newQc.Ctx.Value(testKey), "new context should be present") + assert.Equal(t, nil, qc.Ctx.Value(testKey), "old context should be the same") + }) + + t.Run("returns nil on nil", func(t *testing.T) { + var qc *QueryContext + assert.Nil(t, qc.WithContext(context.TODO())) + }) +} diff --git a/src/query/parser/interface.go b/src/query/parser/interface.go index 39b95a1906..5e7852e174 100644 --- a/src/query/parser/interface.go +++ b/src/query/parser/interface.go @@ -21,8 +21,9 @@ package parser import ( - "context" "fmt" + + "github.com/m3db/m3/src/query/models" ) // Parser consists of the language specific representation of AST and can convert into a common DAG @@ -77,5 +78,5 @@ func NewTransformFromOperation(Op Params, nextID int) Node { // Source represents data sources which are handled differently than other transforms as they are always independent and can always be parallelized type Source interface { - Execute(ctx context.Context) error + Execute(queryCtx *models.QueryContext) error } diff --git a/src/query/test/executor/transform.go b/src/query/test/executor/transform.go index 402606c246..e4a84404e6 100644 --- a/src/query/test/executor/transform.go +++ b/src/query/test/executor/transform.go @@ -23,6 +23,7 @@ package executor import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -48,7 +49,7 @@ type SinkNode struct { } // Process processes and stores the last block output in the sink node -func (s *SinkNode) Process(ID parser.NodeID, block block.Block) error { +func (s *SinkNode) Process(_ *models.QueryContext, ID parser.NodeID, block block.Block) error { iter, err := block.SeriesIter() if err != nil { return err