From 1012004714f2b5a695bd188b434bf0a82d496b4b Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Thu, 21 Feb 2019 13:22:25 -0500 Subject: [PATCH] Query context without cost accounting (#1396) Adds a `queryContext` argument to `OpNode.Process` to hold any per query state. I use this in both my [cost accounting](https://github.com/m3db/m3/pull/1207) and [tracing](https://github.com/m3db/m3/pull/1321) PR's. At the time, I based my tracing branch off of the cost accounting branch. Tracing is closer to landing though, so I've now factored out the common changes, and rebased them both against this branch. --- src/query/executor/engine.go | 3 +- src/query/executor/engine_test.go | 2 +- src/query/executor/result.go | 3 +- src/query/executor/state.go | 16 +++-- src/query/executor/state_test.go | 3 +- src/query/executor/transform/controller.go | 13 ++-- src/query/executor/transform/lazy.go | 22 +++--- src/query/executor/transform/lazy_test.go | 7 +- src/query/executor/transform/types.go | 2 +- src/query/functions/aggregation/base.go | 7 +- src/query/functions/aggregation/base_test.go | 2 +- .../functions/aggregation/count_values.go | 6 +- .../aggregation/count_values_test.go | 2 +- src/query/functions/aggregation/take.go | 7 +- src/query/functions/aggregation/take_test.go | 3 +- src/query/functions/binary/and.go | 4 +- src/query/functions/binary/and_test.go | 9 +-- src/query/functions/binary/arithmetic.go | 14 +++- src/query/functions/binary/base.go | 9 +-- src/query/functions/binary/binary.go | 12 +++- src/query/functions/binary/binary_test.go | 28 ++++---- src/query/functions/binary/comparison.go | 14 +++- src/query/functions/binary/logical.go | 11 +-- src/query/functions/binary/or.go | 4 +- src/query/functions/binary/or_test.go | 20 +++--- src/query/functions/binary/unless.go | 4 +- src/query/functions/binary/unless_test.go | 4 +- src/query/functions/fetch.go | 8 ++- src/query/functions/fetch_test.go | 4 +- src/query/functions/linear/absent_test.go | 5 +- src/query/functions/linear/base.go | 7 +- src/query/functions/linear/clamp_test.go | 5 +- src/query/functions/linear/datetime_test.go | 15 ++-- .../functions/linear/histogram_quantile.go | 19 +++--- .../linear/histogram_quantile_test.go | 2 +- src/query/functions/linear/math_test.go | 19 +++--- src/query/functions/linear/round_test.go | 3 +- src/query/functions/scalar/base.go | 8 +-- src/query/functions/scalar/base_test.go | 4 +- src/query/functions/tag/base.go | 5 +- src/query/functions/tag/join_test.go | 2 +- src/query/functions/tag/replace_test.go | 3 +- .../functions/temporal/aggregation_test.go | 6 +- src/query/functions/temporal/base.go | 28 +++++--- src/query/functions/temporal/base_test.go | 68 ++++++++++--------- .../functions/temporal/functions_test.go | 6 +- .../functions/temporal/holt_winters_test.go | 6 +- .../temporal/linear_regression_test.go | 6 +- src/query/functions/temporal/rate_test.go | 6 +- .../functions/unconsolidated/timestamp.go | 7 +- .../unconsolidated/timestamp_test.go | 3 +- src/query/models/query_context.go | 65 ++++++++++++++++++ src/query/models/query_context_test.go | 48 +++++++++++++ src/query/parser/interface.go | 5 +- src/query/test/executor/transform.go | 3 +- 55 files changed, 397 insertions(+), 200 deletions(-) create mode 100644 src/query/models/query_context.go create mode 100644 src/query/models/query_context_test.go diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index fa052b6227..8ee9377f43 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -156,7 +156,8 @@ func (e *Engine) ExecuteExpr( result := state.resultNode results <- Query{Result: result} - if err := state.Execute(ctx); err != nil { + + if err := state.Execute(models.NewQueryContext(ctx, tally.NoopScope)); err != nil { result.abort(err) } else { result.done() diff --git a/src/query/executor/engine_test.go b/src/query/executor/engine_test.go index 7d547fa120..e68c75e42b 100644 --- a/src/query/executor/engine_test.go +++ b/src/query/executor/engine_test.go @@ -35,7 +35,7 @@ import ( "github.com/uber-go/tally" ) -func TestExecute(t *testing.T) { +func TestEngine_Execute(t *testing.T) { logging.InitWithCores(nil) ctrl := gomock.NewController(t) store, session := m3.NewStorageAndSession(t, ctrl) diff --git a/src/query/executor/result.go b/src/query/executor/result.go index d0637d132f..af65be0a23 100644 --- a/src/query/executor/result.go +++ b/src/query/executor/result.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/pkg/errors" @@ -64,7 +65,7 @@ func newResultNode() *ResultNode { } // Process the block -func (r *ResultNode) Process(ID parser.NodeID, block block.Block) error { +func (r *ResultNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { if r.aborted { return errAborted } diff --git a/src/query/executor/state.go b/src/query/executor/state.go index fba81d7fb7..f3e18f2dd9 100644 --- a/src/query/executor/state.go +++ b/src/query/executor/state.go @@ -25,6 +25,7 @@ import ( "fmt" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/plan" "github.com/m3db/m3/src/query/storage" @@ -184,13 +185,16 @@ func (s *ExecutionState) createNode( } // Execute the sources in parallel and return the first error -func (s *ExecutionState) Execute(ctx context.Context) error { +func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error { requests := make([]execution.Request, len(s.sources)) for idx, source := range s.sources { - requests[idx] = sourceRequest{source} + requests[idx] = sourceRequest{ + source: source, + queryCtx: queryCtx, + } } - return execution.ExecuteParallel(ctx, requests) + return execution.ExecuteParallel(queryCtx.Ctx, requests) } // String representation of the state @@ -199,9 +203,11 @@ func (s *ExecutionState) String() string { } type sourceRequest struct { - source parser.Source + source parser.Source + queryCtx *models.QueryContext } func (s sourceRequest) Process(ctx context.Context) error { - return s.source.Execute(ctx) + // make sure to propagate the new context.Context object down. + return s.source.Execute(s.queryCtx.WithContext(ctx)) } diff --git a/src/query/executor/state_test.go b/src/query/executor/state_test.go index 0f6908919c..be80009987 100644 --- a/src/query/executor/state_test.go +++ b/src/query/executor/state_test.go @@ -21,7 +21,6 @@ package executor import ( - "context" "testing" "time" @@ -61,7 +60,7 @@ func TestValidState(t *testing.T) { state, err := GenerateExecutionState(p, store) require.NoError(t, err) require.Len(t, state.sources, 1) - err = state.Execute(context.Background()) + err = state.Execute(models.NoopQueryContext()) assert.NoError(t, err) } diff --git a/src/query/executor/transform/controller.go b/src/query/executor/transform/controller.go index 8c97e6e0f1..7fdadbd8cf 100644 --- a/src/query/executor/transform/controller.go +++ b/src/query/executor/transform/controller.go @@ -22,6 +22,7 @@ package transform import ( "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -36,10 +37,10 @@ func (t *Controller) AddTransform(node OpNode) { t.transforms = append(t.transforms, node) } -// Process performs processing on the underlying transforms. -func (t *Controller) Process(block block.Block) error { +// Process performs processing on the underlying transforms +func (t *Controller) Process(queryCtx *models.QueryContext, block block.Block) error { for _, ts := range t.transforms { - err := ts.Process(t.ID, block) + err := ts.Process(queryCtx, t.ID, block) if err != nil { return err } @@ -48,11 +49,11 @@ func (t *Controller) Process(block block.Block) error { return nil } -// BlockBuilder returns a BlockBuilder instance with associated metadata. +// BlockBuilder returns a BlockBuilder instance with associated metadata func (t *Controller) BlockBuilder( + queryCtx *models.QueryContext, blockMeta block.Metadata, - seriesMeta []block.SeriesMeta, -) (block.Builder, error) { + seriesMeta []block.SeriesMeta) (block.Builder, error) { return block.NewColumnBlockBuilder(blockMeta, seriesMeta), nil } diff --git a/src/query/executor/transform/lazy.go b/src/query/executor/transform/lazy.go index 53ebe29ae5..888fb25151 100644 --- a/src/query/executor/transform/lazy.go +++ b/src/query/executor/transform/lazy.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -31,7 +32,7 @@ type sinkNode struct { block block.Block } -func (s *sinkNode) Process(ID parser.NodeID, block block.Block) error { +func (s *sinkNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { s.block = block return nil } @@ -44,9 +45,7 @@ type lazyNode struct { // NewLazyNode creates a new wrapper around a function fNode to make it support lazy initialization func NewLazyNode(node OpNode, controller *Controller) (OpNode, *Controller) { - c := &Controller{ - ID: controller.ID, - } + c := &Controller{ID: controller.ID} sink := &sinkNode{} controller.AddTransform(sink) @@ -58,14 +57,15 @@ func NewLazyNode(node OpNode, controller *Controller) (OpNode, *Controller) { }, c } -func (f *lazyNode) Process(ID parser.NodeID, block block.Block) error { +func (f *lazyNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { b := &lazyBlock{ rawBlock: block, lazyNode: f, + queryCtx: queryCtx, ID: ID, } - return f.controller.Process(b) + return f.controller.Process(queryCtx, b) } type stepIter struct { @@ -177,9 +177,11 @@ func (s *seriesIter) Next() bool { } type lazyBlock struct { - mu sync.Mutex - rawBlock block.Block - lazyNode *lazyNode + mu sync.Mutex + rawBlock block.Block + lazyNode *lazyNode + + queryCtx *models.QueryContext ID parser.NodeID processedBlock block.Block processError error @@ -283,7 +285,7 @@ func (f *lazyBlock) Close() error { } func (f *lazyBlock) process() error { - err := f.lazyNode.fNode.Process(f.ID, f.rawBlock) + err := f.lazyNode.fNode.Process(f.queryCtx, f.ID, f.rawBlock) if err != nil { f.processError = err return err diff --git a/src/query/executor/transform/lazy_test.go b/src/query/executor/transform/lazy_test.go index abb2e06a85..96db1b9c24 100644 --- a/src/query/executor/transform/lazy_test.go +++ b/src/query/executor/transform/lazy_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" @@ -35,9 +36,9 @@ type dummyFunc struct { controller *Controller } -func (f *dummyFunc) Process(ID parser.NodeID, block block.Block) error { +func (f *dummyFunc) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { f.processed = true - f.controller.Process(block) + f.controller.Process(queryCtx, block) return nil } @@ -50,7 +51,7 @@ func TestLazyState(t *testing.T) { downStreamController.AddTransform(sNode) values, bounds := test.GenerateValuesAndBounds(nil, nil) b := test.NewBlockFromValues(bounds, values) - err := node.Process(parser.NodeID(1), b) + err := node.Process(models.NoopQueryContext(), parser.NodeID(1), b) assert.NoError(t, err) assert.NotNil(t, sNode.block, "downstream process called with a block") assert.IsType(t, sNode.block, &lazyBlock{}) diff --git a/src/query/executor/transform/types.go b/src/query/executor/transform/types.go index eaa009cb1c..e6b6de07e2 100644 --- a/src/query/executor/transform/types.go +++ b/src/query/executor/transform/types.go @@ -37,7 +37,7 @@ type Options struct { // OpNode represents the execution node type OpNode interface { - Process(ID parser.NodeID, block block.Block) error + Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error } // TimeSpec defines the time bounds for the query execution. End is exclusive diff --git a/src/query/functions/aggregation/base.go b/src/query/functions/aggregation/base.go index 77c487ad6c..4abcf0639f 100644 --- a/src/query/functions/aggregation/base.go +++ b/src/query/functions/aggregation/base.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -109,7 +110,7 @@ type baseNode struct { } // Process the block -func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -126,7 +127,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { ) meta.Tags, metas = utils.DedupeMetadata(metas) - builder, err := n.controller.BlockBuilder(meta, metas) + builder, err := n.controller.BlockBuilder(queryCtx, meta, metas) if err != nil { return err } @@ -152,5 +153,5 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } diff --git a/src/query/functions/aggregation/base_test.go b/src/query/functions/aggregation/base_test.go index c2731cfae5..38642649f8 100644 --- a/src/query/functions/aggregation/base_test.go +++ b/src/query/functions/aggregation/base_test.go @@ -67,7 +67,7 @@ func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode { bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), bl) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink } diff --git a/src/query/functions/aggregation/count_values.go b/src/query/functions/aggregation/count_values.go index d2ecbdc355..518b11157a 100644 --- a/src/query/functions/aggregation/count_values.go +++ b/src/query/functions/aggregation/count_values.go @@ -140,7 +140,7 @@ func processBlockBucketAtColumn( } // Process the block -func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { +func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -210,7 +210,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { metaTags, flattenedMeta := utils.DedupeMetadata(blockMetas) meta.Tags = metaTags - builder, err := n.controller.BlockBuilder(meta, flattenedMeta) + builder, err := n.controller.BlockBuilder(queryCtx, meta, flattenedMeta) if err != nil { return err } @@ -231,7 +231,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } // pads vals with enough NaNs to match size diff --git a/src/query/functions/aggregation/count_values_test.go b/src/query/functions/aggregation/count_values_test.go index b74bcd1a8e..75f9a1277b 100644 --- a/src/query/functions/aggregation/count_values_test.go +++ b/src/query/functions/aggregation/count_values_test.go @@ -83,7 +83,7 @@ func processCountValuesOp( bl := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, vals) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(countValuesOp).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), bl) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink } diff --git a/src/query/functions/aggregation/take.go b/src/query/functions/aggregation/take.go index e8e735db83..49700b58dd 100644 --- a/src/query/functions/aggregation/take.go +++ b/src/query/functions/aggregation/take.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/ts" ) @@ -111,7 +112,7 @@ type takeNode struct { } // Process the block -func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { +func (n *takeNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -128,7 +129,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { ) // retain original metadatas - builder, err := n.controller.BlockBuilder(meta, stepIter.SeriesMeta()) + builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta()) if err != nil { return err } @@ -150,7 +151,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } // shortcut to return empty when taking <= 0 values diff --git a/src/query/functions/aggregation/take_test.go b/src/query/functions/aggregation/take_test.go index 801cee10a4..ddad9ff8d3 100644 --- a/src/query/functions/aggregation/take_test.go +++ b/src/query/functions/aggregation/take_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -62,7 +63,7 @@ func processTakeOp(t *testing.T, op parser.Params) *executor.SinkNode { bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(takeOp).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), bl) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink } diff --git a/src/query/functions/binary/and.go b/src/query/functions/binary/and.go index d62981d771..e14c823ebb 100644 --- a/src/query/functions/binary/and.go +++ b/src/query/functions/binary/and.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) // AndType uses values from left hand side for which there is a value in right hand side with exactly matching label sets. @@ -32,13 +33,14 @@ import ( const AndType = "and" func makeAndBlock( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, ) (block.Block, error) { lMeta, rSeriesMeta := lIter.Meta(), rIter.SeriesMeta() - builder, err := controller.BlockBuilder(lMeta, rSeriesMeta) + builder, err := controller.BlockBuilder(queryCtx, lMeta, rSeriesMeta) if err != nil { return nil, err } diff --git a/src/query/functions/binary/and_test.go b/src/query/functions/binary/and_test.go index 43d010780d..f4bf33fbda 100644 --- a/src/query/functions/binary/and_test.go +++ b/src/query/functions/binary/and_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -51,9 +52,9 @@ func TestAndWithExactValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Equal(t, values, sink.Values) } @@ -83,9 +84,9 @@ func TestAndWithSomeValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) // Most values same as lhs expected := values1 diff --git a/src/query/functions/binary/arithmetic.go b/src/query/functions/binary/arithmetic.go index 7bf4e88cc5..c020e64808 100644 --- a/src/query/functions/binary/arithmetic.go +++ b/src/query/functions/binary/arithmetic.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) const ( @@ -81,7 +82,16 @@ func buildArithmeticFunction( } // Build the binary processing step - return func(lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { - return processBinary(lhs, rhs, params, controller, false, fn) + return func( + queryCtx *models.QueryContext, + lhs, rhs block.Block, + controller *transform.Controller) (block.Block, error) { + return processBinary( + queryCtx, + lhs, rhs, + params, + controller, + false, + fn) }, true } diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index 3a06ae87b7..f8b36f537d 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -114,10 +115,10 @@ type baseNode struct { mu sync.Mutex } -type processFunc func(block.Block, block.Block, *transform.Controller) (block.Block, error) +type processFunc func(*models.QueryContext, block.Block, block.Block, *transform.Controller) (block.Block, error) // Process processes a block -func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { lhs, rhs, err := n.computeOrCache(ID, b) if err != nil { // Clean up any blocks from cache @@ -132,13 +133,13 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { n.cleanup() - nextBlock, err := n.process(lhs, rhs, n.controller) + nextBlock, err := n.process(queryCtx, lhs, rhs, n.controller) if err != nil { return err } defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } // computeOrCache figures out if both lhs and rhs are available, if not then it caches the incoming block diff --git a/src/query/functions/binary/binary.go b/src/query/functions/binary/binary.go index 67d427191d..65573623df 100644 --- a/src/query/functions/binary/binary.go +++ b/src/query/functions/binary/binary.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" ) // Function is a function that applies on two floats @@ -34,6 +35,7 @@ type singleScalarFunc func(x float64) float64 // processes two logical blocks, performing a logical operation on them func processBinary( + queryCtx *models.QueryContext, lhs, rhs block.Block, params NodeParams, controller *transform.Controller, @@ -56,6 +58,7 @@ func processBinary( // rhs is a series; use rhs metadata and series meta if !params.RIsScalar { return processSingleBlock( + queryCtx, rhs, controller, func(x float64) float64 { @@ -95,6 +98,7 @@ func processBinary( rVal := scalarR.Value(time.Time{}) // lhs is a series; use lhs metadata and series meta return processSingleBlock( + queryCtx, lhs, controller, func(x float64) float64 { @@ -116,10 +120,11 @@ func processBinary( return nil, errNoMatching } - return processBothSeries(lIter, rIter, controller, params.VectorMatching, fn) + return processBothSeries(queryCtx, lIter, rIter, controller, params.VectorMatching, fn) } func processSingleBlock( + queryCtx *models.QueryContext, block block.Block, controller *transform.Controller, fn singleScalarFunc, @@ -129,7 +134,7 @@ func processSingleBlock( return nil, err } - builder, err := controller.BlockBuilder(it.Meta(), it.SeriesMeta()) + builder, err := controller.BlockBuilder(queryCtx, it.Meta(), it.SeriesMeta()) if err != nil { return nil, err } @@ -154,6 +159,7 @@ func processSingleBlock( } func processBothSeries( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -173,7 +179,7 @@ func processBothSeries( lMeta.Tags, lSeriesMeta = utils.DedupeMetadata(lSeriesMeta) // Use metas from only taken left series - builder, err := controller.BlockBuilder(lMeta, lSeriesMeta) + builder, err := controller.BlockBuilder(queryCtx, lMeta, lSeriesMeta) if err != nil { return nil, err } diff --git a/src/query/functions/binary/binary_test.go b/src/query/functions/binary/binary_test.go index 1ee5a783cc..81737f3cdc 100644 --- a/src/query/functions/binary/binary_test.go +++ b/src/query/functions/binary/binary_test.go @@ -116,10 +116,10 @@ func TestScalars(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) require.NoError(t, err) expected := [][]float64{{ @@ -160,10 +160,10 @@ func TestScalarsReturnBoolFalse(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.lVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.rVal }, bounds)) if tt.opType == EqType || tt.opType == NotEqType || tt.opType == GreaterType || tt.opType == LesserType || @@ -510,16 +510,16 @@ func TestSingleSeriesReturnBool(t *testing.T) { series := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, seriesValues) // Set the series and scalar blocks on the correct sides if tt.seriesLeft { - err = node.Process(parser.NodeID(0), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), series) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) } else { - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), series) require.NoError(t, err) } @@ -565,16 +565,16 @@ func TestSingleSeriesReturnValues(t *testing.T) { series := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, seriesValues) // Set the series and scalar blocks on the correct sides if tt.seriesLeft { - err = node.Process(parser.NodeID(0), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), series) require.NoError(t, err) - err = node.Process(parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) } else { - err = node.Process(parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block.NewScalar(func(_ time.Time) float64 { return tt.scalarVal }, bounds)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), series) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), series) require.NoError(t, err) } @@ -827,10 +827,10 @@ func TestBothSeries(t *testing.T) { StepSize: time.Minute, } - err = node.Process(parser.NodeID(0), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs)) require.NoError(t, err) - err = node.Process(parser.NodeID(1), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs)) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs)) require.NoError(t, err) test.EqualsWithNans(t, tt.expected, sink.Values) diff --git a/src/query/functions/binary/comparison.go b/src/query/functions/binary/comparison.go index f2400e99cc..b351b6aa9f 100644 --- a/src/query/functions/binary/comparison.go +++ b/src/query/functions/binary/comparison.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) const ( @@ -98,7 +99,16 @@ func buildComparisonFunction( return nil, false } - return func(lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { - return processBinary(lhs, rhs, params, controller, true, fn) + return func( + queryCtx *models.QueryContext, + lhs, rhs block.Block, + controller *transform.Controller) (block.Block, error) { + return processBinary( + queryCtx, + lhs, rhs, + params, + controller, + true, + fn) }, true } diff --git a/src/query/functions/binary/logical.go b/src/query/functions/binary/logical.go index fae12a2873..02b3616357 100644 --- a/src/query/functions/binary/logical.go +++ b/src/query/functions/binary/logical.go @@ -23,9 +23,11 @@ package binary import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) type makeBlockFn func( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -55,13 +57,14 @@ func buildLogicalFunction( func createLogicalProcessingStep( params NodeParams, fn makeBlockFn, -) func(block.Block, block.Block, *transform.Controller) (block.Block, error) { - return func(lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { - return processLogical(lhs, rhs, controller, params.VectorMatching, fn) +) processFunc { + return func(queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller) (block.Block, error) { + return processLogical(queryCtx, lhs, rhs, controller, params.VectorMatching, fn) } } func processLogical( + queryCtx *models.QueryContext, lhs, rhs block.Block, controller *transform.Controller, matching *VectorMatching, @@ -81,5 +84,5 @@ func processLogical( return nil, errMismatchedStepCounts } - return makeBlock(lIter, rIter, controller, matching) + return makeBlock(queryCtx, lIter, rIter, controller, matching) } diff --git a/src/query/functions/binary/or.go b/src/query/functions/binary/or.go index 4db009e153..ab38e88820 100644 --- a/src/query/functions/binary/or.go +++ b/src/query/functions/binary/or.go @@ -23,6 +23,7 @@ package binary import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" ) // OrType uses all values from left hand side, and appends values from the right hand side which do @@ -30,6 +31,7 @@ import ( const OrType = "or" func makeOrBlock( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -50,7 +52,7 @@ func makeOrBlock( rSeriesMetas, ) - builder, err := controller.BlockBuilder(meta, combinedSeriesMeta) + builder, err := controller.BlockBuilder(queryCtx, meta, combinedSeriesMeta) if err != nil { return nil, err } diff --git a/src/query/functions/binary/or_test.go b/src/query/functions/binary/or_test.go index 623eb6059d..ff43c7921b 100644 --- a/src/query/functions/binary/or_test.go +++ b/src/query/functions/binary/or_test.go @@ -55,9 +55,9 @@ func TestOrWithExactValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Equal(t, values, sink.Values) } @@ -86,9 +86,9 @@ func TestOrWithSomeValues(t *testing.T) { c, sink := executor.NewControllerWithSink(parser.NodeID(2)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(1), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), block2) require.NoError(t, err) - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) // NAN values should be filled expected := values1 @@ -230,7 +230,7 @@ func TestOrs(t *testing.T) { } lhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) bounds = models.Bounds{ @@ -240,7 +240,7 @@ func TestOrs(t *testing.T) { } rhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) if tt.err != nil { require.EqualError(t, err, tt.err.Error()) return @@ -275,7 +275,7 @@ func TestOrsBoundsError(t *testing.T) { node := op.(baseOp).Node(c, transform.Options{}) lhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) differentBounds := models.Bounds{ @@ -284,7 +284,7 @@ func TestOrsBoundsError(t *testing.T) { StepSize: bounds.StepSize, } rhs := test.NewBlockFromValuesWithSeriesMeta(differentBounds, tt.rhsMeta, tt.rhs) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) require.EqualError(t, err, errMismatchedBounds.Error()) } @@ -327,7 +327,7 @@ func TestOrCombinedMetadata(t *testing.T) { lSeriesMeta, [][]float64{{1, 2}, {10, 20}}) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) strTags = test.StringTags{{"a", "b"}, {"c", "*d"}, {"g", "h"}} @@ -345,7 +345,7 @@ func TestOrCombinedMetadata(t *testing.T) { rSeriesMeta, [][]float64{{3, 4}, {30, 40}}) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) require.NoError(t, err) test.EqualsWithNans(t, [][]float64{{1, 2}, {10, 20}, {3, 4}, {30, 40}}, sink.Values) diff --git a/src/query/functions/binary/unless.go b/src/query/functions/binary/unless.go index b8a5e983fe..008b9096c0 100644 --- a/src/query/functions/binary/unless.go +++ b/src/query/functions/binary/unless.go @@ -26,12 +26,14 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" ) // UnlessType uses all values from lhs which do not exist in rhs const UnlessType = "unless" func makeUnlessBlock( + queryCtx *models.QueryContext, lIter, rIter block.StepIter, controller *transform.Controller, matching *VectorMatching, @@ -52,7 +54,7 @@ func makeUnlessBlock( meta := lIter.Meta() commonTags, dedupedSeriesMetas := utils.DedupeMetadata(distinctSeriesMeta) meta.Tags = commonTags - builder, err := controller.BlockBuilder(meta, dedupedSeriesMetas) + builder, err := controller.BlockBuilder(queryCtx, meta, dedupedSeriesMetas) if err != nil { return nil, err } diff --git a/src/query/functions/binary/unless_test.go b/src/query/functions/binary/unless_test.go index 75a713181c..b120156f82 100644 --- a/src/query/functions/binary/unless_test.go +++ b/src/query/functions/binary/unless_test.go @@ -222,7 +222,7 @@ func TestUnless(t *testing.T) { } lhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.lhsMeta, tt.lhs) - err = node.Process(parser.NodeID(0), lhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), lhs) require.NoError(t, err) bounds = models.Bounds{ Start: now, @@ -231,7 +231,7 @@ func TestUnless(t *testing.T) { } rhs := test.NewBlockFromValuesWithSeriesMeta(bounds, tt.rhsMeta, tt.rhs) - err = node.Process(parser.NodeID(1), rhs) + err = node.Process(models.NoopQueryContext(), parser.NodeID(1), rhs) if tt.err != nil { require.EqualError(t, err, tt.err.Error()) return diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index cf79afa40f..b5f04ca796 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -21,7 +21,6 @@ package functions import ( - "context" "fmt" "time" @@ -88,13 +87,16 @@ func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage, } // Execute runs the fetch node operation -func (n *FetchNode) Execute(ctx context.Context) error { +func (n *FetchNode) Execute(queryCtx *models.QueryContext) error { timeSpec := n.timespec // No need to adjust start and ends since physical plan already considers the offset, range startTime := timeSpec.Start endTime := timeSpec.End opts := storage.NewFetchOptions() opts.BlockType = n.blockType + + ctx := queryCtx.Ctx + blockResult, err := n.storage.FetchBlocks(ctx, &storage.FetchQuery{ Start: startTime, End: endTime, @@ -114,7 +116,7 @@ func (n *FetchNode) Execute(ctx context.Context) error { } } - if err := n.controller.Process(block); err != nil { + if err := n.controller.Process(queryCtx, block); err != nil { block.Close() // Fail on first error return err diff --git a/src/query/functions/fetch_test.go b/src/query/functions/fetch_test.go index 805a5925ca..df40626744 100644 --- a/src/query/functions/fetch_test.go +++ b/src/query/functions/fetch_test.go @@ -21,11 +21,11 @@ package functions import ( - "context" "testing" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage/mock" "github.com/m3db/m3/src/query/test" @@ -42,7 +42,7 @@ func TestFetch(t *testing.T) { mockStorage := mock.NewMockStorage() mockStorage.SetFetchBlocksResult(block.Result{Blocks: []block.Block{b}}, nil) source := (&FetchOp{}).Node(c, mockStorage, transform.Options{}) - err := source.Execute(context.TODO()) + err := source.Execute(models.NoopQueryContext()) require.NoError(t, err) expected := values assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/absent_test.go b/src/query/functions/linear/absent_test.go index 623bf2b16d..530087d237 100644 --- a/src/query/functions/linear/absent_test.go +++ b/src/query/functions/linear/absent_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -42,7 +43,7 @@ func TestAbsentWithValues(t *testing.T) { block := test.NewBlockFromValues(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := NewAbsentOp().Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) expected := [][]float64{ @@ -62,7 +63,7 @@ func TestAbsentWithNoValues(t *testing.T) { block := test.NewBlockFromValues(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := NewAbsentOp().Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) assert.Equal(t, [][]float64{{1, 1, 1, 1, 1}, {1, 1, 1, 1, 1}}, sink.Values) diff --git a/src/query/functions/linear/base.go b/src/query/functions/linear/base.go index b07196ab18..437adecea5 100644 --- a/src/query/functions/linear/base.go +++ b/src/query/functions/linear/base.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -80,13 +81,13 @@ func (c *baseNode) ProcessSeries(series block.Series) (block.Series, error) { } // Process the block -func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err } - builder, err := c.controller.BlockBuilder(stepIter.Meta(), stepIter.SeriesMeta()) + builder, err := c.controller.BlockBuilder(queryCtx, stepIter.Meta(), stepIter.SeriesMeta()) if err != nil { return err } @@ -109,7 +110,7 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return c.controller.Process(nextBlock) + return c.controller.Process(queryCtx, nextBlock) } // Meta returns the metadata for the block diff --git a/src/query/functions/linear/clamp_test.go b/src/query/functions/linear/clamp_test.go index 351ead30b9..088b1cea63 100644 --- a/src/query/functions/linear/clamp_test.go +++ b/src/query/functions/linear/clamp_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -56,7 +57,7 @@ func TestClampMin(t *testing.T) { op, err := NewClampOp([]interface{}{3.0}, ClampMinType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedClampVals(values, 3.0, math.Max) assert.Len(t, sink.Values, 2) @@ -72,7 +73,7 @@ func TestClampMax(t *testing.T) { op, err := NewClampOp([]interface{}{3.0}, ClampMaxType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedClampVals(values, 3.0, math.Min) assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/datetime_test.go b/src/query/functions/linear/datetime_test.go index a8b4edca53..7c3e6a9471 100644 --- a/src/query/functions/linear/datetime_test.go +++ b/src/query/functions/linear/datetime_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -64,7 +65,7 @@ func TestDayOfMonth(t *testing.T) { op, err := NewDateOp(DayOfMonthType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[DayOfMonthType]) assert.Len(t, sink.Values, 2) @@ -83,7 +84,7 @@ func TestDayOfWeek(t *testing.T) { op, err := NewDateOp(DayOfWeekType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[DayOfWeekType]) assert.Len(t, sink.Values, 2) @@ -102,7 +103,7 @@ func TestDaysInMonth(t *testing.T) { op, err := NewDateOp(DaysInMonthType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[DaysInMonthType]) assert.Len(t, sink.Values, 2) @@ -121,7 +122,7 @@ func TestHour(t *testing.T) { op, err := NewDateOp(HourType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[HourType]) assert.Len(t, sink.Values, 2) @@ -140,7 +141,7 @@ func TestMinute(t *testing.T) { op, err := NewDateOp(MinuteType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[MinuteType]) assert.Len(t, sink.Values, 2) @@ -159,7 +160,7 @@ func TestMonth(t *testing.T) { op, err := NewDateOp(MonthType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[MonthType]) assert.Len(t, sink.Values, 2) @@ -178,7 +179,7 @@ func TestYear(t *testing.T) { op, err := NewDateOp(YearType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedDateVals(values, datetimeFuncs[YearType]) assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index 377c9365fc..813adfea7b 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -233,7 +233,7 @@ func bucketQuantile(q float64, buckets []bucketValue) float64 { } // Process the block -func (n *histogramQuantileNode) Process(ID parser.NodeID, b block.Block) error { +func (n *histogramQuantileNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { stepIter, err := b.StepIter() if err != nil { return err @@ -245,13 +245,14 @@ func (n *histogramQuantileNode) Process(ID parser.NodeID, b block.Block) error { q := n.op.q if q < 0 || q > 1 { - return processInvalidQuantile(q, bucketedSeries, meta, stepIter, n.controller) + return processInvalidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller) } - return processValidQuantile(q, bucketedSeries, meta, stepIter, n.controller) + return processValidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller) } func setupBuilder( + queryCtx *models.QueryContext, bucketedSeries bucketedSeries, meta block.Metadata, stepIter block.StepIter, @@ -268,7 +269,7 @@ func setupBuilder( } meta.Tags, metas = utils.DedupeMetadata(metas) - builder, err := controller.BlockBuilder(meta, metas) + builder, err := controller.BlockBuilder(queryCtx, meta, metas) if err != nil { return nil, err } @@ -281,6 +282,7 @@ func setupBuilder( } func processValidQuantile( + queryCtx *models.QueryContext, q float64, bucketedSeries bucketedSeries, meta block.Metadata, @@ -289,7 +291,7 @@ func processValidQuantile( ) error { sanitizeBuckets(bucketedSeries) - builder, err := setupBuilder(bucketedSeries, meta, stepIter, controller) + builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) if err != nil { return err } @@ -331,17 +333,18 @@ func processValidQuantile( nextBlock := builder.Build() defer nextBlock.Close() - return controller.Process(nextBlock) + return controller.Process(queryCtx, nextBlock) } func processInvalidQuantile( + queryCtx *models.QueryContext, q float64, bucketedSeries bucketedSeries, meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, ) error { - builder, err := setupBuilder(bucketedSeries, meta, stepIter, controller) + builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) if err != nil { return err } @@ -366,5 +369,5 @@ func processInvalidQuantile( nextBlock := builder.Build() defer nextBlock.Close() - return controller.Process(nextBlock) + return controller.Process(queryCtx, nextBlock) } diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go index 714623b3cf..99a3e0695c 100644 --- a/src/query/functions/linear/histogram_quantile_test.go +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -275,7 +275,7 @@ func testQuantileFunctionWithQ(t *testing.T, q float64) [][]float64 { bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(histogramQuantileOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), bl) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) return sink.Values diff --git a/src/query/functions/linear/math_test.go b/src/query/functions/linear/math_test.go index 49f24d11ea..a5a7285118 100644 --- a/src/query/functions/linear/math_test.go +++ b/src/query/functions/linear/math_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -56,7 +57,7 @@ func TestAbsWithAllValues(t *testing.T) { op, err := NewMathOp(AbsType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Abs) assert.Len(t, sink.Values, 2) @@ -75,7 +76,7 @@ func TestAbsWithSomeValues(t *testing.T) { op, err := NewMathOp(AbsType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Abs) assert.Len(t, sink.Values, 2) @@ -90,7 +91,7 @@ func TestLn(t *testing.T) { op, err := NewMathOp(LnType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Log) assert.Len(t, sink.Values, 2) @@ -109,7 +110,7 @@ func TestLog10WithNoValues(t *testing.T) { op, err := NewMathOp(Log10Type) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Log10) assert.Len(t, sink.Values, 2) @@ -128,7 +129,7 @@ func TestLog2WithSomeValues(t *testing.T) { op, err := NewMathOp(Log2Type) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Log2) assert.Len(t, sink.Values, 2) @@ -147,7 +148,7 @@ func TestFloorWithSomeValues(t *testing.T) { op, err := NewMathOp(FloorType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Floor) assert.Len(t, sink.Values, 2) @@ -166,7 +167,7 @@ func TestCeilWithSomeValues(t *testing.T) { op, err := NewMathOp(CeilType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Ceil) assert.Len(t, sink.Values, 2) @@ -184,7 +185,7 @@ func TestExpWithSomeValues(t *testing.T) { op, err := NewMathOp(ExpType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Exp) assert.Len(t, sink.Values, 2) @@ -203,7 +204,7 @@ func TestSqrtWithSomeValues(t *testing.T) { op, err := NewMathOp(SqrtType) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := expectedMathVals(values, math.Sqrt) assert.Len(t, sink.Values, 2) diff --git a/src/query/functions/linear/round_test.go b/src/query/functions/linear/round_test.go index 97b14833a0..8b6068413c 100644 --- a/src/query/functions/linear/round_test.go +++ b/src/query/functions/linear/round_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -44,7 +45,7 @@ func TestRoundWithSomeValues(t *testing.T) { op, err := NewRoundOp([]interface{}{10.0}) require.NoError(t, err) node := op.Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) expected := [][]float64{ diff --git a/src/query/functions/scalar/base.go b/src/query/functions/scalar/base.go index e39a67f7e3..e95ea807a7 100644 --- a/src/query/functions/scalar/base.go +++ b/src/query/functions/scalar/base.go @@ -21,11 +21,11 @@ package scalar import ( - "context" "fmt" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/util/logging" @@ -86,7 +86,7 @@ type baseNode struct { } // Execute runs the scalar node operation -func (n *baseNode) Execute(ctx context.Context) error { +func (n *baseNode) Execute(queryCtx *models.QueryContext) error { bounds := n.timespec.Bounds() block := block.NewScalar(n.op.fn, bounds) @@ -94,11 +94,11 @@ func (n *baseNode) Execute(ctx context.Context) error { // Ignore any errors iter, _ := block.StepIter() if iter != nil { - logging.WithContext(ctx).Info("scalar node", zap.Any("meta", iter.Meta())) + logging.WithContext(queryCtx.Ctx).Info("scalar node", zap.Any("meta", iter.Meta())) } } - if err := n.controller.Process(block); err != nil { + if err := n.controller.Process(queryCtx, block); err != nil { block.Close() // Fail on first error return err diff --git a/src/query/functions/scalar/base_test.go b/src/query/functions/scalar/base_test.go index c3c1d8c004..3b5c670ce7 100644 --- a/src/query/functions/scalar/base_test.go +++ b/src/query/functions/scalar/base_test.go @@ -21,11 +21,11 @@ package scalar import ( - "context" "testing" "time" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -51,7 +51,7 @@ func TestScalarTime(t *testing.T) { Step: step, }, }) - err := node.Execute(context.Background()) + err := node.Execute(models.NoopQueryContext()) require.NoError(t, err) assert.Len(t, sink.Values, 1) diff --git a/src/query/functions/tag/base.go b/src/query/functions/tag/base.go index 881cbdb328..3cde9c0140 100644 --- a/src/query/functions/tag/base.go +++ b/src/query/functions/tag/base.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -97,7 +98,7 @@ type baseNode struct { } // Process the block. -func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { it, err := b.StepIter() if err != nil { return err @@ -113,5 +114,5 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error { } defer bl.Close() - return n.controller.Process(bl) + return n.controller.Process(queryCtx, bl) } diff --git a/src/query/functions/tag/join_test.go b/src/query/functions/tag/join_test.go index eca49124d5..d6982e39f4 100644 --- a/src/query/functions/tag/join_test.go +++ b/src/query/functions/tag/join_test.go @@ -187,7 +187,7 @@ func TestTagJoinOp(t *testing.T) { bl := block.NewColumnBlockBuilder(meta, seriesMeta).Build() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), bl) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) assert.Equal(t, test.StringTagsToTags(tt.expectedMetaTags), sink.Meta.Tags) diff --git a/src/query/functions/tag/replace_test.go b/src/query/functions/tag/replace_test.go index 4c45fee2dc..b82d76cdf0 100644 --- a/src/query/functions/tag/replace_test.go +++ b/src/query/functions/tag/replace_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -164,7 +165,7 @@ func TestTagReplaceOp(t *testing.T) { bl := block.NewColumnBlockBuilder(meta, seriesMeta).Build() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) - err = node.Process(parser.NodeID(0), bl) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) require.NoError(t, err) assert.Equal(t, test.StringTagsToTags(tt.expectedMetaTags), sink.Meta.Tags) diff --git a/src/query/functions/temporal/aggregation_test.go b/src/query/functions/temporal/aggregation_test.go index 544f258ba3..1421e6fc3c 100644 --- a/src/query/functions/temporal/aggregation_test.go +++ b/src/query/functions/temporal/aggregation_test.go @@ -290,7 +290,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -307,7 +307,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -323,7 +323,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index e0c59cdeeb..a78611b32f 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -93,7 +93,7 @@ type baseNode struct { // 4. Process all valid blocks from #3, #4 and mark them as processed // 5. Run a sweep phase to free up blocks which are no longer needed to be cached // TODO: Figure out if something else needs to be locked -func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { +func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { unconsolidatedBlock, err := b.Unconsolidated() if err != nil { return err @@ -152,7 +152,12 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { processRequests := make([]processRequest, 0, len(leftBlks)) // If we have all blocks for the left range in the cache, then process the current block if !emptyLeftBlocks { - processRequests = append(processRequests, processRequest{blk: unconsolidatedBlock, deps: leftBlks, bounds: bounds}) + processRequests = append(processRequests, processRequest{ + blk: unconsolidatedBlock, + deps: leftBlks, + bounds: bounds, + queryCtx: queryCtx, + }) } leftBlks = append(leftBlks, unconsolidatedBlock) @@ -171,7 +176,13 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { deps := leftBlks[len(leftBlks)-lStart:] deps = append(deps, rightBlks[:i]...) - processRequests = append(processRequests, processRequest{blk: rightBlks[i], deps: deps, bounds: bounds.Next(i + 1)}) + processRequests = append( + processRequests, + processRequest{ + blk: rightBlks[i], + deps: deps, + bounds: bounds.Next(i + 1), + queryCtx: queryCtx}) } // If either the left range or right range wasn't fully processed then cache the current block @@ -255,7 +266,7 @@ func (c *baseNode) processSingleRequest(request processRequest) error { } } - builder, err := c.controller.BlockBuilder(seriesIter.Meta(), resultSeriesMeta) + builder, err := c.controller.BlockBuilder(request.queryCtx, seriesIter.Meta(), resultSeriesMeta) if err != nil { return err } @@ -319,7 +330,7 @@ func (c *baseNode) processSingleRequest(request processRequest) error { nextBlock := builder.Build() defer nextBlock.Close() - return c.controller.Process(nextBlock) + return c.controller.Process(request.queryCtx, nextBlock) } func (c *baseNode) sweep(processedKeys []bool, maxBlocks int) { @@ -359,9 +370,10 @@ type MakeProcessor interface { } type processRequest struct { - blk block.UnconsolidatedBlock - bounds models.Bounds - deps []block.UnconsolidatedBlock + queryCtx *models.QueryContext + blk block.UnconsolidatedBlock + bounds models.Bounds + deps []block.UnconsolidatedBlock } // blockCache keeps track of blocks from the same parent across time diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index c936850271..785187cb05 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -81,7 +81,7 @@ func TestBaseWithB0(t *testing.T) { Step: time.Second, }, }) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) require.IsType(t, node, &baseNode{}) @@ -98,7 +98,7 @@ func TestBaseWithB0(t *testing.T) { }, }) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) bNode = node.(*baseNode) _, exists = bNode.cache.get(boundStart) @@ -113,7 +113,7 @@ func TestBaseWithB0(t *testing.T) { }, }) - err = node.Process(parser.NodeID(0), block) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) bNode = node.(*baseNode) _, exists = bNode.cache.get(boundStart) @@ -123,12 +123,12 @@ func TestBaseWithB0(t *testing.T) { func TestBaseWithB1B0(t *testing.T) { tc := setup(2, 5*time.Minute, 1) - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "nothing processed yet") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true}, "B1 cached") - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "output from both blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false}, "everything removed from cache") @@ -140,13 +140,12 @@ func TestBaseWithB1B0(t *testing.T) { func TestBaseWithB0B1(t *testing.T) { tc := setup(2, 5*time.Minute, 1) - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) - + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false}, "B0 cached for future") - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "output from both blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false}, "B0 removed from cache, B1 not cached") @@ -159,19 +158,19 @@ func TestBaseWithB0B1B2(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B0 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false}, "B0 cached for future") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "output from B0, B1") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B0 removed from cache, B1 cached") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -181,19 +180,19 @@ func TestBaseWithB0B2B1(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B0 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false}, "B0 cached for future") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "Only B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, true}, "B0, B2 cached") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -203,19 +202,19 @@ func TestBaseWithB1B0B2(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B1 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached for future") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "B0, B1 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 still cached, B0 not cached") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -225,19 +224,19 @@ func TestBaseWithB1B2B0(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B1 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached for future") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B1 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 still cached, B2 not cached") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -247,19 +246,19 @@ func TestBaseWithB2B0B1(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B2 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, true}, "B2 cached for future") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, true}, "B0, B2 cached") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -269,19 +268,19 @@ func TestBaseWithB2B1B0(t *testing.T) { tc := setup(3, 5*time.Minute, 2) // B2 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 0, "Nothing processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, true}, "B2 cached for future") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached, B2 removed") // B0 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "output from all blocks") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") @@ -291,31 +290,31 @@ func TestBaseWithSize3B0B1B2B3B4(t *testing.T) { tc := setup(5, 15*time.Minute, 4) // B0 arrives - err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) + err := tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 2, "B0 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false, false, false}, "B0 cached for future") // B1 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 4, "B0, B1 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, true, false, false, false}, "B0, B1 cached") // B2 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 6, "B0, B1, B2 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{true, true, true, false, false}, "B0, B1, B2 cached") // B3 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[3]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[3]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 8, "B0, B1, B2, B3 processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, true, true, false}, "B0 removed, B1, B2, B3 cached") // B4 arrives - err = tc.Node.Process(parser.NodeID(0), tc.Blocks[4]) + err = tc.Node.Process(models.NoopQueryContext(), parser.NodeID(0), tc.Blocks[4]) require.NoError(t, err) assert.Len(t, tc.Sink.Values, 10, "all 5 blocks processed") compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false, false, false}, "nothing cached") @@ -397,7 +396,12 @@ func TestSingleProcessRequest(t *testing.T) { }, }) bNode := node.(*baseNode) - err := bNode.processSingleRequest(processRequest{blk: block2, bounds: bounds, deps: []block.UnconsolidatedBlock{block1}}) + err := bNode.processSingleRequest(processRequest{ + blk: block2, + bounds: bounds, + deps: []block.UnconsolidatedBlock{block1}, + queryCtx: models.NoopQueryContext(), + }) assert.NoError(t, err) assert.Len(t, sink.Values, 2, "block processed") // Current Block: 0 1 2 3 4 5 diff --git a/src/query/functions/temporal/functions_test.go b/src/query/functions/temporal/functions_test.go index d59dfd43ff..46192302cf 100644 --- a/src/query/functions/temporal/functions_test.go +++ b/src/query/functions/temporal/functions_test.go @@ -157,7 +157,7 @@ func testTemporalFunc(t *testing.T, testCases []testCase) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -174,7 +174,7 @@ func testTemporalFunc(t *testing.T, testCases []testCase) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -190,7 +190,7 @@ func testTemporalFunc(t *testing.T, testCases []testCase) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/holt_winters_test.go b/src/query/functions/temporal/holt_winters_test.go index 480f267ebd..b85964305b 100644 --- a/src/query/functions/temporal/holt_winters_test.go +++ b/src/query/functions/temporal/holt_winters_test.go @@ -96,7 +96,7 @@ func testHoltWinters(t *testing.T, testCases []testCase, vals [][]float64) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -113,7 +113,7 @@ func testHoltWinters(t *testing.T, testCases []testCase, vals [][]float64) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -129,7 +129,7 @@ func testHoltWinters(t *testing.T, testCases []testCase, vals [][]float64) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/linear_regression_test.go b/src/query/functions/temporal/linear_regression_test.go index 028f65cac5..fd2bc25426 100644 --- a/src/query/functions/temporal/linear_regression_test.go +++ b/src/query/functions/temporal/linear_regression_test.go @@ -170,7 +170,7 @@ func testLinearRegression(t *testing.T, testCases []testCase, vals [][]float64) }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -187,7 +187,7 @@ func testLinearRegression(t *testing.T, testCases []testCase, vals [][]float64) }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -203,7 +203,7 @@ func testLinearRegression(t *testing.T, testCases []testCase, vals [][]float64) StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/temporal/rate_test.go b/src/query/functions/temporal/rate_test.go index fe7c3552eb..59e7d8c411 100644 --- a/src/query/functions/temporal/rate_test.go +++ b/src/query/functions/temporal/rate_test.go @@ -323,7 +323,7 @@ func testRate(t *testing.T, testCases []testRateCase) { }, }) bNode := node.(*baseNode) - err = node.Process(parser.NodeID(0), block3) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block3) require.NoError(t, err) assert.Len(t, sink.Values, 0, "nothing processed yet") b, exists := bNode.cache.get(boundStart) @@ -340,7 +340,7 @@ func testRate(t *testing.T, testCases []testRateCase) { }, values) values[0][0] = original - err = node.Process(parser.NodeID(0), block1) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block1) require.NoError(t, err) assert.Len(t, sink.Values, 2, "output from first block only") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) @@ -356,7 +356,7 @@ func testRate(t *testing.T, testCases []testRateCase) { StepSize: bounds.StepSize, }, values) - err = node.Process(parser.NodeID(0), block2) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), block2) require.NoError(t, err) assert.Len(t, sink.Values, 6, "output from all 3 blocks") test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) diff --git a/src/query/functions/unconsolidated/timestamp.go b/src/query/functions/unconsolidated/timestamp.go index 359e5926ed..052bc47385 100644 --- a/src/query/functions/unconsolidated/timestamp.go +++ b/src/query/functions/unconsolidated/timestamp.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/ts" ) @@ -83,7 +84,7 @@ type timestampNode struct { } // Process the block -func (n *timestampNode) Process(ID parser.NodeID, b block.Block) error { +func (n *timestampNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { unconsolidatedBlock, err := b.Unconsolidated() if err != nil { return err @@ -94,7 +95,7 @@ func (n *timestampNode) Process(ID parser.NodeID, b block.Block) error { return err } - builder, err := n.controller.BlockBuilder(iter.Meta(), iter.SeriesMeta()) + builder, err := n.controller.BlockBuilder(queryCtx, iter.Meta(), iter.SeriesMeta()) if err != nil { return err } @@ -127,5 +128,5 @@ func (n *timestampNode) Process(ID parser.NodeID, b block.Block) error { nextBlock := builder.Build() defer nextBlock.Close() - return n.controller.Process(nextBlock) + return n.controller.Process(queryCtx, nextBlock) } diff --git a/src/query/functions/unconsolidated/timestamp_test.go b/src/query/functions/unconsolidated/timestamp_test.go index 08f1666c6a..0c3d536fc6 100644 --- a/src/query/functions/unconsolidated/timestamp_test.go +++ b/src/query/functions/unconsolidated/timestamp_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/test/executor" @@ -38,7 +39,7 @@ func TestTimestamp(t *testing.T) { block := test.NewUnconsolidatedBlockFromDatapoints(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := newTimestampOp(TimestampType).Node(c, transform.Options{}) - err := node.Process(parser.NodeID(0), block) + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), block) require.NoError(t, err) assert.Len(t, sink.Values, 2) diff --git a/src/query/models/query_context.go b/src/query/models/query_context.go new file mode 100644 index 0000000000..5da6e50dd7 --- /dev/null +++ b/src/query/models/query_context.go @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "context" + + "github.com/uber-go/tally" +) + +// QueryContext provides all external state needed to execute and track a query. +// It acts as a hook back into the execution engine for things like +// cost accounting. +type QueryContext struct { + Ctx context.Context + Scope tally.Scope +} + +// NewQueryContext constructs a QueryContext using the given Enforcer to +// enforce per query limits. +func NewQueryContext(ctx context.Context, scope tally.Scope) *QueryContext { + return &QueryContext{ + Ctx: ctx, + Scope: scope, + } +} + +// NoopQueryContext returns a query context with no active components. +func NoopQueryContext() *QueryContext { + return NewQueryContext(context.Background(), tally.NoopScope) +} + +// WithContext creates a shallow copy of this QueryContext using the new context. +// Sample usage: +// +// ctx, cancel := context.WithTimeout(qc.Ctx, 5*time.Second) +// defer cancel() +// qc = qc.WithContext(ctx) +func (qc *QueryContext) WithContext(ctx context.Context) *QueryContext { + if qc == nil { + return nil + } + + clone := *qc + clone.Ctx = ctx + return &clone +} diff --git a/src/query/models/query_context_test.go b/src/query/models/query_context_test.go new file mode 100644 index 0000000000..5cea7f659b --- /dev/null +++ b/src/query/models/query_context_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package models + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueryContext_WithContext(t *testing.T) { + t.Run("passes along new context without modifying old", func(t *testing.T) { + qc := NoopQueryContext() + + testKey := struct{}{} + + newCtx := context.WithValue(qc.Ctx, testKey, "bar") + + newQc := qc.WithContext(newCtx) + + assert.Equal(t, "bar", newQc.Ctx.Value(testKey), "new context should be present") + assert.Equal(t, nil, qc.Ctx.Value(testKey), "old context should be the same") + }) + + t.Run("returns nil on nil", func(t *testing.T) { + var qc *QueryContext + assert.Nil(t, qc.WithContext(context.TODO())) + }) +} diff --git a/src/query/parser/interface.go b/src/query/parser/interface.go index 39b95a1906..5e7852e174 100644 --- a/src/query/parser/interface.go +++ b/src/query/parser/interface.go @@ -21,8 +21,9 @@ package parser import ( - "context" "fmt" + + "github.com/m3db/m3/src/query/models" ) // Parser consists of the language specific representation of AST and can convert into a common DAG @@ -77,5 +78,5 @@ func NewTransformFromOperation(Op Params, nextID int) Node { // Source represents data sources which are handled differently than other transforms as they are always independent and can always be parallelized type Source interface { - Execute(ctx context.Context) error + Execute(queryCtx *models.QueryContext) error } diff --git a/src/query/test/executor/transform.go b/src/query/test/executor/transform.go index 402606c246..e4a84404e6 100644 --- a/src/query/test/executor/transform.go +++ b/src/query/test/executor/transform.go @@ -23,6 +23,7 @@ package executor import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" ) @@ -48,7 +49,7 @@ type SinkNode struct { } // Process processes and stores the last block output in the sink node -func (s *SinkNode) Process(ID parser.NodeID, block block.Block) error { +func (s *SinkNode) Process(_ *models.QueryContext, ID parser.NodeID, block block.Block) error { iter, err := block.SeriesIter() if err != nil { return err