Skip to content

Commit

Permalink
Query context without cost accounting (#1396)
Browse files Browse the repository at this point in the history
Adds a `queryContext` argument to `OpNode.Process` to hold any per query state. I use this in both my [cost accounting](#1207) and [tracing](#1321) PR's. At the time, I based my tracing branch off of the cost accounting branch. Tracing is closer to landing though, so I've now factored out the common changes, and rebased them both against this branch.
  • Loading branch information
andrewmains12 authored Feb 21, 2019
1 parent 667f864 commit 1012004
Show file tree
Hide file tree
Showing 55 changed files with 397 additions and 200 deletions.
3 changes: 2 additions & 1 deletion src/query/executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (e *Engine) ExecuteExpr(

result := state.resultNode
results <- Query{Result: result}
if err := state.Execute(ctx); err != nil {

if err := state.Execute(models.NewQueryContext(ctx, tally.NoopScope)); err != nil {
result.abort(err)
} else {
result.done()
Expand Down
2 changes: 1 addition & 1 deletion src/query/executor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/uber-go/tally"
)

func TestExecute(t *testing.T) {
func TestEngine_Execute(t *testing.T) {
logging.InitWithCores(nil)
ctrl := gomock.NewController(t)
store, session := m3.NewStorageAndSession(t, ctrl)
Expand Down
3 changes: 2 additions & 1 deletion src/query/executor/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"

"github.com/pkg/errors"
Expand Down Expand Up @@ -64,7 +65,7 @@ func newResultNode() *ResultNode {
}

// Process the block
func (r *ResultNode) Process(ID parser.NodeID, block block.Block) error {
func (r *ResultNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error {
if r.aborted {
return errAborted
}
Expand Down
16 changes: 11 additions & 5 deletions src/query/executor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"

"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/plan"
"github.com/m3db/m3/src/query/storage"
Expand Down Expand Up @@ -184,13 +185,16 @@ func (s *ExecutionState) createNode(
}

// Execute the sources in parallel and return the first error
func (s *ExecutionState) Execute(ctx context.Context) error {
func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error {
requests := make([]execution.Request, len(s.sources))
for idx, source := range s.sources {
requests[idx] = sourceRequest{source}
requests[idx] = sourceRequest{
source: source,
queryCtx: queryCtx,
}
}

return execution.ExecuteParallel(ctx, requests)
return execution.ExecuteParallel(queryCtx.Ctx, requests)
}

// String representation of the state
Expand All @@ -199,9 +203,11 @@ func (s *ExecutionState) String() string {
}

type sourceRequest struct {
source parser.Source
source parser.Source
queryCtx *models.QueryContext
}

func (s sourceRequest) Process(ctx context.Context) error {
return s.source.Execute(ctx)
// make sure to propagate the new context.Context object down.
return s.source.Execute(s.queryCtx.WithContext(ctx))
}
3 changes: 1 addition & 2 deletions src/query/executor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package executor

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -61,7 +60,7 @@ func TestValidState(t *testing.T) {
state, err := GenerateExecutionState(p, store)
require.NoError(t, err)
require.Len(t, state.sources, 1)
err = state.Execute(context.Background())
err = state.Execute(models.NoopQueryContext())
assert.NoError(t, err)
}

Expand Down
13 changes: 7 additions & 6 deletions src/query/executor/transform/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package transform

import (
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
)

Expand All @@ -36,10 +37,10 @@ func (t *Controller) AddTransform(node OpNode) {
t.transforms = append(t.transforms, node)
}

// Process performs processing on the underlying transforms.
func (t *Controller) Process(block block.Block) error {
// Process performs processing on the underlying transforms
func (t *Controller) Process(queryCtx *models.QueryContext, block block.Block) error {
for _, ts := range t.transforms {
err := ts.Process(t.ID, block)
err := ts.Process(queryCtx, t.ID, block)
if err != nil {
return err
}
Expand All @@ -48,11 +49,11 @@ func (t *Controller) Process(block block.Block) error {
return nil
}

// BlockBuilder returns a BlockBuilder instance with associated metadata.
// BlockBuilder returns a BlockBuilder instance with associated metadata
func (t *Controller) BlockBuilder(
queryCtx *models.QueryContext,
blockMeta block.Metadata,
seriesMeta []block.SeriesMeta,
) (block.Builder, error) {
seriesMeta []block.SeriesMeta) (block.Builder, error) {
return block.NewColumnBlockBuilder(blockMeta, seriesMeta), nil
}

Expand Down
22 changes: 12 additions & 10 deletions src/query/executor/transform/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"sync"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
)

type sinkNode struct {
block block.Block
}

func (s *sinkNode) Process(ID parser.NodeID, block block.Block) error {
func (s *sinkNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error {
s.block = block
return nil
}
Expand All @@ -44,9 +45,7 @@ type lazyNode struct {

// NewLazyNode creates a new wrapper around a function fNode to make it support lazy initialization
func NewLazyNode(node OpNode, controller *Controller) (OpNode, *Controller) {
c := &Controller{
ID: controller.ID,
}
c := &Controller{ID: controller.ID}

sink := &sinkNode{}
controller.AddTransform(sink)
Expand All @@ -58,14 +57,15 @@ func NewLazyNode(node OpNode, controller *Controller) (OpNode, *Controller) {
}, c
}

func (f *lazyNode) Process(ID parser.NodeID, block block.Block) error {
func (f *lazyNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error {
b := &lazyBlock{
rawBlock: block,
lazyNode: f,
queryCtx: queryCtx,
ID: ID,
}

return f.controller.Process(b)
return f.controller.Process(queryCtx, b)
}

type stepIter struct {
Expand Down Expand Up @@ -177,9 +177,11 @@ func (s *seriesIter) Next() bool {
}

type lazyBlock struct {
mu sync.Mutex
rawBlock block.Block
lazyNode *lazyNode
mu sync.Mutex
rawBlock block.Block
lazyNode *lazyNode

queryCtx *models.QueryContext
ID parser.NodeID
processedBlock block.Block
processError error
Expand Down Expand Up @@ -283,7 +285,7 @@ func (f *lazyBlock) Close() error {
}

func (f *lazyBlock) process() error {
err := f.lazyNode.fNode.Process(f.ID, f.rawBlock)
err := f.lazyNode.fNode.Process(f.queryCtx, f.ID, f.rawBlock)
if err != nil {
f.processError = err
return err
Expand Down
7 changes: 4 additions & 3 deletions src/query/executor/transform/lazy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/test"

Expand All @@ -35,9 +36,9 @@ type dummyFunc struct {
controller *Controller
}

func (f *dummyFunc) Process(ID parser.NodeID, block block.Block) error {
func (f *dummyFunc) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error {
f.processed = true
f.controller.Process(block)
f.controller.Process(queryCtx, block)
return nil
}

Expand All @@ -50,7 +51,7 @@ func TestLazyState(t *testing.T) {
downStreamController.AddTransform(sNode)
values, bounds := test.GenerateValuesAndBounds(nil, nil)
b := test.NewBlockFromValues(bounds, values)
err := node.Process(parser.NodeID(1), b)
err := node.Process(models.NoopQueryContext(), parser.NodeID(1), b)
assert.NoError(t, err)
assert.NotNil(t, sNode.block, "downstream process called with a block")
assert.IsType(t, sNode.block, &lazyBlock{})
Expand Down
2 changes: 1 addition & 1 deletion src/query/executor/transform/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Options struct {

// OpNode represents the execution node
type OpNode interface {
Process(ID parser.NodeID, block block.Block) error
Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error
}

// TimeSpec defines the time bounds for the query execution. End is exclusive
Expand Down
7 changes: 4 additions & 3 deletions src/query/functions/aggregation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/functions/utils"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
)

Expand Down Expand Up @@ -109,7 +110,7 @@ type baseNode struct {
}

// Process the block
func (n *baseNode) Process(ID parser.NodeID, b block.Block) error {
func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error {
stepIter, err := b.StepIter()
if err != nil {
return err
Expand All @@ -126,7 +127,7 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error {
)
meta.Tags, metas = utils.DedupeMetadata(metas)

builder, err := n.controller.BlockBuilder(meta, metas)
builder, err := n.controller.BlockBuilder(queryCtx, meta, metas)
if err != nil {
return err
}
Expand All @@ -152,5 +153,5 @@ func (n *baseNode) Process(ID parser.NodeID, b block.Block) error {

nextBlock := builder.Build()
defer nextBlock.Close()
return n.controller.Process(nextBlock)
return n.controller.Process(queryCtx, nextBlock)
}
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func processAggregationOp(t *testing.T, op parser.Params) *executor.SinkNode {
bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))
node := op.(baseOp).Node(c, transform.Options{})
err := node.Process(parser.NodeID(0), bl)
err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl)
require.NoError(t, err)
return sink
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/functions/aggregation/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func processBlockBucketAtColumn(
}

// Process the block
func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error {
stepIter, err := b.StepIter()
if err != nil {
return err
Expand Down Expand Up @@ -210,7 +210,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {
metaTags, flattenedMeta := utils.DedupeMetadata(blockMetas)
meta.Tags = metaTags

builder, err := n.controller.BlockBuilder(meta, flattenedMeta)
builder, err := n.controller.BlockBuilder(queryCtx, meta, flattenedMeta)
if err != nil {
return err
}
Expand All @@ -231,7 +231,7 @@ func (n *countValuesNode) Process(ID parser.NodeID, b block.Block) error {

nextBlock := builder.Build()
defer nextBlock.Close()
return n.controller.Process(nextBlock)
return n.controller.Process(queryCtx, nextBlock)
}

// pads vals with enough NaNs to match size
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/aggregation/count_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func processCountValuesOp(
bl := test.NewBlockFromValuesWithSeriesMeta(bounds, metas, vals)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))
node := op.(countValuesOp).Node(c, transform.Options{})
err := node.Process(parser.NodeID(0), bl)
err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl)
require.NoError(t, err)
return sink
}
Expand Down
7 changes: 4 additions & 3 deletions src/query/functions/aggregation/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/functions/utils"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/ts"
)
Expand Down Expand Up @@ -111,7 +112,7 @@ type takeNode struct {
}

// Process the block
func (n *takeNode) Process(ID parser.NodeID, b block.Block) error {
func (n *takeNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error {
stepIter, err := b.StepIter()
if err != nil {
return err
Expand All @@ -128,7 +129,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error {
)

// retain original metadatas
builder, err := n.controller.BlockBuilder(meta, stepIter.SeriesMeta())
builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta())
if err != nil {
return err
}
Expand All @@ -150,7 +151,7 @@ func (n *takeNode) Process(ID parser.NodeID, b block.Block) error {

nextBlock := builder.Build()
defer nextBlock.Close()
return n.controller.Process(nextBlock)
return n.controller.Process(queryCtx, nextBlock)
}

// shortcut to return empty when taking <= 0 values
Expand Down
3 changes: 2 additions & 1 deletion src/query/functions/aggregation/take_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/functions/utils"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/test"
"github.com/m3db/m3/src/query/test/executor"
Expand Down Expand Up @@ -62,7 +63,7 @@ func processTakeOp(t *testing.T, op parser.Params) *executor.SinkNode {
bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))
node := op.(takeOp).Node(c, transform.Options{})
err := node.Process(parser.NodeID(0), bl)
err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl)
require.NoError(t, err)
return sink
}
Expand Down
4 changes: 3 additions & 1 deletion src/query/functions/binary/and.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ import (

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/models"
)

// AndType uses values from left hand side for which there is a value in right hand side with exactly matching label sets.
// Other elements are replaced by NaNs. The metric name and values are carried over from the left-hand side.
const AndType = "and"

func makeAndBlock(
queryCtx *models.QueryContext,
lIter, rIter block.StepIter,
controller *transform.Controller,
matching *VectorMatching,
) (block.Block, error) {
lMeta, rSeriesMeta := lIter.Meta(), rIter.SeriesMeta()

builder, err := controller.BlockBuilder(lMeta, rSeriesMeta)
builder, err := controller.BlockBuilder(queryCtx, lMeta, rSeriesMeta)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1012004

Please sign in to comment.