diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 295e7bb899..fedf1e8779 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -24,7 +24,6 @@ import ( "time" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" - "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" diff --git a/src/query/DATAMODEL.md b/src/query/DATAMODEL.md deleted file mode 100644 index c741032f9a..0000000000 --- a/src/query/DATAMODEL.md +++ /dev/null @@ -1,5 +0,0 @@ - - -DAG is represented by Controller nodes - -Controller -> Nodes -> Controller \ No newline at end of file diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index 0e71037442..fc8b315f20 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/ts" opentracingutil "github.com/m3db/m3/src/query/util/opentracing" + opentracinglog "github.com/opentracing/opentracing-go/log" ) diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 230c414ece..7eeb368a3a 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -48,10 +48,10 @@ import ( "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/net/http" "github.com/m3db/m3/src/x/net/http/cors" - "github.com/opentracing-contrib/go-stdlib/nethttp" - "github.com/opentracing/opentracing-go" "github.com/gorilla/mux" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" ) diff --git a/src/query/config/m3query-dev-etcd.yml b/src/query/config/m3query-dev-etcd.yml index 1ae3e6b11f..518ba71910 100644 --- a/src/query/config/m3query-dev-etcd.yml +++ b/src/query/config/m3query-dev-etcd.yml @@ -67,5 +67,8 @@ limits: global: maxFetchedDatapoints: 500000000 -tracing: - backend: jaeger +# Uncomment this to enable local jaeger tracing. See https://www.jaegertracing.io/docs/1.9/getting-started/ +# for quick local setup (which this config will send data to). + +#tracing: +# backend: jaeger diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index 5f3bcfb198..b3343198ec 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -28,8 +28,8 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage" - "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" ) diff --git a/src/query/executor/request.go b/src/query/executor/request.go index b4c5e9f386..1231e3d89c 100644 --- a/src/query/executor/request.go +++ b/src/query/executor/request.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/plan" "github.com/m3db/m3/src/query/util/logging" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" ) @@ -128,6 +129,3 @@ func (r *Request) generateExecutionState(ctx context.Context, pp plan.PhysicalPl return state, nil } - -func (r *Request) finish() { -} diff --git a/src/query/executor/result.go b/src/query/executor/result.go index 013ffd56c3..ba41828395 100644 --- a/src/query/executor/result.go +++ b/src/query/executor/result.go @@ -54,6 +54,7 @@ type ResultNode struct { aborted bool } +// Params returns the params associated with this node. func (r *ResultNode) Params() parser.Params { return utils.StaticParams("result") } diff --git a/src/query/executor/transform/controller.go b/src/query/executor/transform/controller.go index a05f3e1f44..1389bd9d68 100644 --- a/src/query/executor/transform/controller.go +++ b/src/query/executor/transform/controller.go @@ -40,17 +40,7 @@ func (t *Controller) AddTransform(node OpNode) { // Process performs processing on the underlying transforms func (t *Controller) Process(queryCtx *models.QueryContext, block block.Block) error { for _, ts := range t.transforms { - - // TODO: ts.Process calls its children's .Process before returning (via controller). - // This screws up the tracing--it looks like ts isn't done processing, even though it's actually - // completely handed off control to the child node. - // This could be fixed by moving the spans into each node, and *finishing* the span before handing off - // to the next. More elegantly, we could refactor Process to be block in, block out, and outsource passing - // blocks between nodes to an external actor (better use of Controller maybe). - // sp, _ := opentracing.StartSpanFromContext(queryCtx.Ctx, ts.Params().OpType()) - err := ts.Process(queryCtx, t.ID, block) - // sp.Finish() - if err != nil { + if err := ts.Process(queryCtx, t.ID, block); err != nil { return err } } diff --git a/src/query/executor/transform/exec.go b/src/query/executor/transform/exec.go index 42cdc010ff..524baef38b 100644 --- a/src/query/executor/transform/exec.go +++ b/src/query/executor/transform/exec.go @@ -24,15 +24,24 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" + "github.com/opentracing/opentracing-go" ) -type simpleBlock interface { +type simpleOpNode interface { OpNode ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) } -func ProcessSimpleBlock(node simpleBlock, controller *Controller, queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { +// ProcessSimpleBlock is a utility for OpNode instances which simply propagate their data after doing their own +// processing, allowing them to implement a simpler interface. +// It adds instrumentation to the processing, and handles propagating the block downstream. +// OpNode's should call this as their implementation of the Process method: +// +// func (n MyNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { +// return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +// } +func ProcessSimpleBlock(node simpleOpNode, controller *Controller, queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { sp, _ := opentracing.StartSpanFromContext(queryCtx.Ctx, node.Params().OpType()) nextBlock, err := node.ProcessBlock(queryCtx, ID, b) sp.Finish() diff --git a/src/query/executor/transform/exec_test.go b/src/query/executor/transform/exec_test.go new file mode 100644 index 0000000000..cf4948df18 --- /dev/null +++ b/src/query/executor/transform/exec_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package transform + +import "testing" + +func TestProcessSimpleBlock(t *testing.T) { + t.Run("closes next block", func(t *testing.T) { + + }) + + t.Run("errors on process error", func(t *testing.T) { + + }) +} diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index 98ae599233..54f7ab5105 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" + "github.com/opentracing/opentracing-go" ) @@ -150,7 +151,7 @@ func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl func (n *baseNode) processWithTracing(queryCtx *models.QueryContext, lhs block.Block, rhs block.Block) (block.Block, error) { sp, ctx := opentracing.StartSpanFromContext(queryCtx.Ctx, n.op.OpType()) defer sp.Finish() - queryCtx = queryCtx.WithDerivedContext(ctx) + queryCtx = queryCtx.WithContext(ctx) return n.process(queryCtx, lhs, rhs, n.controller) } diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index 02162f4936..3ea4ab7dc8 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -111,6 +111,7 @@ func (n *FetchNode) fetch(ctx context.Context, queryCtx *models.QueryContext) (b // Execute runs the fetch node operation func (n *FetchNode) Execute(queryCtx *models.QueryContext) error { + ctx := queryCtx.Ctx blockResult, err := n.fetch(queryCtx.Ctx, queryCtx) if err != nil { return err diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index 6a15786aeb..72d084c4b0 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -33,8 +33,8 @@ import ( "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" - "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" ) diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index 8a46154a9e..fe2e51d4f6 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -396,13 +396,16 @@ func TestSingleProcessRequest(t *testing.T) { }, }) bNode := node.(*baseNode) - err := bNode.processSingleRequest(processRequest{ + request := processRequest{ blk: block2, bounds: bounds, deps: []block.UnconsolidatedBlock{block1}, queryCtx: models.NoopQueryContext(), - }) - assert.NoError(t, err) + } + bl, err := bNode.processSingleRequest(request) + require.NoError(t, err) + + bNode.propagateNextBlocks([]processRequest{request}, []block.Block{bl}, 1) assert.Len(t, sink.Values, 2, "block processed") // Current Block: 0 1 2 3 4 5 // Previous Block: 10 11 12 13 14 15 diff --git a/src/query/functions/utils/params.go b/src/query/functions/utils/params.go index b644eceee3..5002b2c991 100644 --- a/src/query/functions/utils/params.go +++ b/src/query/functions/utils/params.go @@ -20,12 +20,15 @@ package utils +// StaticParams is a simple string Params implementation, useful for when no data other than OpType is needed. type StaticParams string +// String simply returns s. func (s StaticParams) String() string { return string(s) } +// OpType simply returns s for StaticParams. func (s StaticParams) OpType() string { return string(s) } diff --git a/src/query/test/executor/transform.go b/src/query/test/executor/transform.go index d230292e97..21dbd17c5a 100644 --- a/src/query/test/executor/transform.go +++ b/src/query/test/executor/transform.go @@ -49,6 +49,7 @@ type SinkNode struct { Metas []block.SeriesMeta } +// Params returns the Params associated with this node. func (s *SinkNode) Params() parser.Params { return utils.StaticParams("sink") } diff --git a/src/query/util/opentracing/context.go b/src/query/util/opentracing/context.go index a929e282c0..f578991502 100644 --- a/src/query/util/opentracing/context.go +++ b/src/query/util/opentracing/context.go @@ -45,10 +45,12 @@ func SpanFromContextOrRoot(ctx context.Context) (opentracing.Span, context.Conte return sp, ctx } +// Time is a log.Field for time.Time values. It translates to RF3339 formatted time strings. func Time(key string, t time.Time) log.Field { return log.String(key, t.Format(time.RFC3339)) } +// Duration is a log.Field for Duration values. It translates to the standard Go duration format (Duration.String()). func Duration(key string, t time.Duration) log.Field { return log.String(key, fmt.Sprint(t)) }