diff --git a/docs/operational_guide/monitoring.md b/docs/operational_guide/monitoring.md new file mode 100644 index 0000000000..8b843e120b --- /dev/null +++ b/docs/operational_guide/monitoring.md @@ -0,0 +1,80 @@ +## Metrics + +TODO: document how to retrieve metrics for M3DB components. + +## Logs + +TODO: document how to retrieve logs for M3DB components. + +## Tracing + +M3DB is integrated with [opentracing](https://opentracing.io/) to provide +insight into query performance and errors. + +### Configuration +Currently, only [Jaeger](https://www.jaegertracing.io/) is supported as a backend. + +To enable it, set tracing.backend to "jaeger" (see also our +[sample local config](https://github.com/m3db/m3/blob/master/src/query/config/m3query-local-etcd.yml): + +``` +tracing: + backend: jaeger # enables jaeger with default configs + jaeger: + # optional configuration for jaeger -- see + # https://github.com/jaegertracing/jaeger-client-go/blob/master/config/config.go#L37 + # for options + ... +``` + +Jaeger can be run locally with docker as described in +https://www.jaegertracing.io/docs/1.9/getting-started/. + +The default configuration will report traces via udp to localhost:6831; +using the all-in-one jaeger container, they will be accessible at + +http://localhost:16686 + +N.B.: for production workloads, you will almost certainly want to use +sampler.type=remote with +[adaptive sampling](https://www.jaegertracing.io/docs/1.10/sampling/#adaptive-sampler) +for Jaeger, as write volumes are likely orders of magnitude higher than +read volumes in most timeseries systems. + +#### Alternative backends + +If you'd like additional backends, we'd love to support them! + +File an issue against M3 and we can work with you on how best to add +the backend. The first time's going to be a little rough--opentracing +unfortunately doesn't support Go plugins (yet--see +https://github.com/opentracing/opentracing-go/issues/133), and `glide`'s +update model means that adding dependencies directly will update +*everything*, which isn't ideal for an isolated dependency change. +These problems are all solvable though, +and we'll work with you to make it happen! + +### Use cases + +Note: all URLs assume a local jaeger setup as described in Jaeger's +[docs](https://www.jaegertracing.io/docs/1.9/getting-started/). + + +#### Finding slow queries + +To find prom queries longer than , filter for `minDuration >= ` on +`operation="GET /api/v1/query_range"`. + +Sample query: +http://localhost:16686/search?end=1548876672544000&limit=20&lookback=1h&maxDuration&minDuration=1ms&operation=GET%20%2Fapi%2Fv1%2Fquery_range&service=m3query&start=1548873072544000 + +#### Finding queries with errors + +Search for `error=true` on `operation="GET /api/v1/query_range"` +http://localhost:16686/search?operation=GET%20%2Fapi%2Fv1%2Fquery_range&service=m3query&tags=%7B%22error%22%3A%22true%22%7D + +#### Finding 500 (Internal Server Error) responses + +Search for `http.status_code=500`. + +http://localhost:16686/search?limit=20&lookback=24h&maxDuration&minDuration&operation=GET%20%2Fapi%2Fv1%2Fquery_range&service=m3query&start=1548802430108000&tags=%7B"http.status_code"%3A"500"%7D \ No newline at end of file diff --git a/glide.lock b/glide.lock index 735b6a1975..009d7edada 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 68f6e58e11ab4f5acc342c013354dc0eca5cd2d17d5573edec766a067f44141a -updated: 2019-02-14T14:22:29.558454-05:00 +hash: 1142bd7c68492e34422277010dfee8fd45c6810cceaa25194352178a1315f716 +updated: 2019-02-21T12:08:41.672845643-05:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -15,6 +15,8 @@ imports: version: 48099fad606eafc26e3a569fad19ff510fff4df6 - name: github.com/cockroachdb/cmux version: 112f0506e7743d64a6eb8fedbcff13d9979bbf92 +- name: github.com/codahale/hdrhistogram + version: 3a0bb77429bd3a61596f5e8a3172445844342120 - name: github.com/coreos/bbolt version: 32c383e75ce054674c53b5a07e55de85332aee14 - name: github.com/coreos/etcd @@ -133,7 +135,7 @@ imports: - name: github.com/go-logfmt/logfmt version: 432dd90af23366a89a611c020003fc8ba281ae5d - name: github.com/gogo/protobuf - version: 4cbf7e384e768b4e01799441fdf2a706a5635ae7 + version: ba06b47c162d49f2af050fb4c75bcbc86a159d5c subpackages: - gogoproto - jsonpb @@ -202,8 +204,6 @@ imports: version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/jonboulle/clockwork version: 2eee05ed794112d45db504eb05aa693efd2b8b09 -- name: github.com/jroimartin/gocui - version: c055c87ae801372cd74a0839b972db4f7697ae5f - name: github.com/kr/logfmt version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 - name: github.com/leanovate/gopter @@ -217,7 +217,7 @@ imports: - name: github.com/m3db/bloom version: 47fe1193cdb900de7193d1f3d26ea9b2cbf6fb31 - name: github.com/m3db/m3x - version: e98ec326dd7b4d4bc97390c709dacf73d49a0bfc + version: b66c9c466c4726e3c9b47b1f837abbbe0f14be81 vcs: git subpackages: - checked @@ -273,10 +273,6 @@ imports: version: 744c0229c12ed0e4f8cb9d081a2692b3300bf705 - name: github.com/magiconair/properties version: 7757cc9fdb852f7579b24170bcacda2c7471bb6a -- name: github.com/mattn/go-runewidth - version: 9e777a8366cce605130a531d2cd6363d07ad7317 - subpackages: - - runewidth.go - name: github.com/matttproud/golang_protobuf_extensions version: c12348ce28de40eed0136aa2b644d0ee0650e56c subpackages: @@ -291,14 +287,18 @@ imports: version: 3536a929edddb9a5b34bd6861dc4a9647cb459fe - name: github.com/nightlyone/lockfile version: 0ad87eef1443f64d3d8c50da647e2b1552851124 -- name: github.com/nsf/termbox-go - version: 02980233997d87bbda048393d47b4d453f7a398d - name: github.com/oklog/ulid version: 1fe95fa015d44afc8cd427914daadb0317481b71 +- name: github.com/opentracing-contrib/go-stdlib + version: 77df8e8e70b403c6b13c0fffaa4867c9044ff4e9 + subpackages: + - nethttp - name: github.com/opentracing/opentracing-go - version: 855519783f479520497c6b3445611b05fc42f009 + version: 1949ddbfd147afd4d964a9f00b24eb291e0e7c38 subpackages: - ext + - log + - mocktracer - name: github.com/pborman/getopt version: ec82d864f599c39673eef89f91b93fa5576567a1 - name: github.com/pborman/uuid @@ -403,6 +403,31 @@ imports: - m3/thriftudp - multi - prometheus +- name: github.com/uber/jaeger-client-go + version: 1a782e2da844727691fef1757c72eb190c2909f0 + subpackages: + - config + - internal/baggage + - internal/baggage/remote + - internal/spanlog + - internal/throttler + - internal/throttler/remote + - log + - log/zap + - rpcmetrics + - thrift + - thrift-gen/agent + - thrift-gen/baggage + - thrift-gen/jaeger + - thrift-gen/sampling + - thrift-gen/zipkincore + - transport + - utils +- name: github.com/uber/jaeger-lib + version: ed3a127ec5fef7ae9ea95b01b542c47fbd999ce5 + subpackages: + - metrics + - metrics/tally - name: github.com/uber/tchannel-go version: 1fcf82ec86967eb43ba0baa9b964f8eb226d242e subpackages: diff --git a/glide.yaml b/glide.yaml index 7cb9b7f892..c459e3f46b 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,7 @@ package: github.com/m3db/m3 import: - package: github.com/m3db/m3x - version: e98ec326dd7b4d4bc97390c709dacf73d49a0bfc + version: b66c9c466c4726e3c9b47b1f837abbbe0f14be81 vcs: git subpackages: - checked @@ -64,7 +64,7 @@ import: version: f85c78b1dd998214c5f2138155b320a4a43fbe36 - package: github.com/opentracing/opentracing-go - version: 855519783f479520497c6b3445611b05fc42f009 + version: 1.0.2 - package: github.com/spaolacci/murmur3 version: 9f5d223c60793748f04a9d5b4b4eacddfc1f755d @@ -201,6 +201,20 @@ import: subpackages: - capnslog + # START_JAEGER_DEPS + - package: github.com/uber/jaeger-lib + version: ^1.5.0 + + - package: github.com/uber/jaeger-client-go + version: ^2.7.0 + + - package: github.com/opentracing-contrib/go-stdlib + # Pin this on recommendation of the repo (no stable release yet). Still arguably better than rewriting + # the same code. + version: 77df8e8e70b403c6b13c0fffaa4867c9044ff4e9 + + # END_JAEGER_DEPS + # To avoid conflicting packages not resolving the latest GRPC - package: google.golang.org/grpc version: ~1.7.3 diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 5514fbbcbf..fa2d5f5c8b 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -73,6 +73,9 @@ type Configuration struct { // Metrics configuration. Metrics instrument.MetricsConfiguration `yaml:"metrics"` + // Tracing configures opentracing. If not provided, tracing is disabled. + Tracing instrument.TracingConfiguration `yaml:"tracing"` + // Clusters is the DB cluster configurations for read, write and // query endpoints. Clusters m3.ClustersStaticConfiguration `yaml:"clusters"` diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index 900711e864..b10ad7c89a 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -32,9 +32,12 @@ import ( "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/query/util/httperrors" "github.com/m3db/m3/src/query/util/logging" - xhttp "github.com/m3db/m3/src/x/net/http" + opentracingutil "github.com/m3db/m3/src/query/util/opentracing" + opentracingext "github.com/opentracing/opentracing-go/ext" + opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -124,7 +127,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { result, params, respErr := h.ServeHTTPWithEngine(w, r, h.engine) if respErr != nil { - xhttp.Error(w, respErr.Err, respErr.Code) + httperrors.ErrorWithReqInfo(w, r, respErr.Code, respErr.Err) return } @@ -167,6 +170,9 @@ func (h *PromReadHandler) ServeHTTPWithEngine( result, err := read(ctx, engine, h.tagOpts, w, params) if err != nil { + sp := opentracingutil.SpanFromContextOrNoop(ctx) + sp.LogFields(opentracinglog.Error(err)) + opentracingext.Error.Set(sp, true) logger.Error("unable to fetch data", zap.Error(err)) h.promReadMetrics.fetchErrorsServer.Inc(1) return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusInternalServerError} diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index b72c2a9fa2..b3ab29192e 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -33,6 +33,9 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/ts" + opentracingutil "github.com/m3db/m3/src/query/util/opentracing" + + opentracinglog "github.com/opentracing/opentracing-go/log" ) func read( @@ -45,6 +48,15 @@ func read( ctx, cancel := context.WithTimeout(reqCtx, params.Timeout) defer cancel() + sp := opentracingutil.SpanFromContextOrNoop(ctx) + sp.LogFields( + opentracinglog.String("params.query", params.Query), + opentracingutil.Time("params.start", params.Start), + opentracingutil.Time("params.end", params.End), + opentracingutil.Time("params.now", params.Now), + opentracingutil.Duration("params.step", params.Step), + ) + opts := &executor.EngineOptions{} // Detect clients closing connections handler.CloseWatcher(ctx, cancel, w) diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go index 31aa489408..18a368a93c 100644 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go +++ b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go @@ -28,8 +28,8 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/util/httperrors" "github.com/m3db/m3/src/query/util/logging" - xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -69,7 +69,7 @@ func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques logger := logging.WithContext(ctx) params, rErr := parseInstantaneousParams(r, h.timeoutOpts) if rErr != nil { - xhttp.Error(w, rErr.Inner(), rErr.Code()) + httperrors.ErrorWithReqInfo(w, r, rErr.Code(), rErr) return } @@ -80,7 +80,7 @@ func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques result, err := read(ctx, h.engine, h.tagOpts, w, params) if err != nil { logger.Error("unable to fetch data", zap.Error(err)) - xhttp.Error(w, err, http.StatusBadRequest) + httperrors.ErrorWithReqInfo(w, r, http.StatusBadRequest, rErr) return } diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 74df26720b..fb0b92b3b7 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -23,6 +23,7 @@ package httpd import ( "encoding/json" "errors" + "fmt" "net/http" _ "net/http/pprof" // needed for pprof handler registration "time" @@ -52,6 +53,8 @@ import ( "github.com/m3db/m3/src/x/net/http/cors" "github.com/gorilla/mux" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" ) @@ -102,13 +105,7 @@ func NewHandler( ) (*Handler, error) { r := mux.NewRouter() - // apply middleware. Just CORS for now, but we could add more here as needed. - withMiddleware := &cors.Handler{ - Handler: r, - Info: &cors.Info{ - "*": true, - }, - } + handlerWithMiddleware := applyMiddleware(r, opentracing.GlobalTracer()) var timeoutOpts = &prometheus.TimeoutOpts{} if embeddedDbCfg == nil || embeddedDbCfg.Client.FetchTimeout == nil { @@ -123,7 +120,7 @@ func NewHandler( h := &Handler{ router: r, - handler: withMiddleware, + handler: handlerWithMiddleware, storage: downsamplerAndWriter.Storage(), downsamplerAndWriter: downsamplerAndWriter, engine: engine, @@ -139,6 +136,23 @@ func NewHandler( return h, nil } +func applyMiddleware(base *mux.Router, tracer opentracing.Tracer) http.Handler { + withMiddleware := http.Handler(&cors.Handler{ + Handler: base, + Info: &cors.Info{ + "*": true, + }, + }) + + // apply jaeger middleware, which will start a span + // for each incoming request + withMiddleware = nethttp.Middleware(tracer, withMiddleware, + nethttp.OperationNameFunc(func(r *http.Request) string { + return fmt.Sprintf("%s %s", r.Method, r.URL.Path) + })) + return withMiddleware +} + // RegisterRoutes registers all http routes. func (h *Handler) RegisterRoutes() error { // Wrap requests with response time logging as well as panic recovery. diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 7897dac6b6..7aed787828 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -43,6 +43,8 @@ import ( xsync "github.com/m3db/m3x/sync" "github.com/golang/mock/gomock" + "github.com/gorilla/mux" + "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" @@ -60,8 +62,15 @@ func makeTagOptions() models.TagOptions { func setupHandler(store storage.Storage) (*Handler, error) { downsamplerAndWriter := ingest.NewDownsamplerAndWriter(store, nil, testWorkerPool) - return NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(store, tally.NewTestScope("test", nil), time.Minute), nil, nil, - config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, tally.NewTestScope("", nil)) + return NewHandler( + downsamplerAndWriter, + makeTagOptions(), + executor.NewEngine(store, tally.NewTestScope("test", nil), time.Minute), + nil, + nil, + config.Configuration{LookbackDuration: &defaultLookbackDuration}, + nil, + tally.NewTestScope("", nil)) } func TestHandlerFetchTimeoutError(t *testing.T) { @@ -240,18 +249,39 @@ func TestCORSMiddleware(t *testing.T) { h, err := setupHandler(s) require.NoError(t, err, "unable to setup handler") - testRoute := "/foobar" - h.router.HandleFunc(testRoute, func(writer http.ResponseWriter, r *http.Request) { - writer.WriteHeader(http.StatusOK) - writer.Write([]byte("hello!")) - }) + setupTestRoute(h.router) + res := doTestRequest(h.Router()) + + assert.Equal(t, "hello!", res.Body.String()) + assert.Equal(t, "*", res.Header().Get("Access-Control-Allow-Origin")) +} + +func doTestRequest(handler http.Handler) *httptest.ResponseRecorder { req, _ := http.NewRequest("GET", testRoute, nil) res := httptest.NewRecorder() - h.Router().ServeHTTP(res, req) + handler.ServeHTTP(res, req) + return res +} - assert.Equal(t, "hello!", res.Body.String()) - assert.Equal(t, "*", res.Header().Get("Access-Control-Allow-Origin")) +func TestTracingMiddleware(t *testing.T) { + mtr := mocktracer.New() + router := mux.NewRouter() + setupTestRoute(router) + + handler := applyMiddleware(router, mtr) + doTestRequest(handler) + + assert.NotEmpty(t, mtr.FinishedSpans()) +} + +const testRoute = "/foobar" + +func setupTestRoute(r *mux.Router) { + r.HandleFunc(testRoute, func(writer http.ResponseWriter, r *http.Request) { + writer.WriteHeader(http.StatusOK) + writer.Write([]byte("hello!")) + }) } func init() { diff --git a/src/query/config/m3query-dev-etcd.yml b/src/query/config/m3query-dev-etcd.yml index 1750a95041..9692937d70 100644 --- a/src/query/config/m3query-dev-etcd.yml +++ b/src/query/config/m3query-dev-etcd.yml @@ -63,3 +63,9 @@ writeWorkerPoolPolicy: tagOptions: idScheme: quoted + +# Uncomment this to enable local jaeger tracing. See https://www.jaegertracing.io/docs/1.9/getting-started/ +# for quick local setup (which this config will send data to). + +# tracing: +# backend: jaeger diff --git a/src/query/config/m3query-local-etcd.yml b/src/query/config/m3query-local-etcd.yml index a2964ceab2..c31983071d 100644 --- a/src/query/config/m3query-local-etcd.yml +++ b/src/query/config/m3query-local-etcd.yml @@ -52,3 +52,9 @@ clusters: jitter: true backgroundHealthCheckFailLimit: 4 backgroundHealthCheckFailThrottleFactor: 0.5 + +# Uncomment this to enable local jaeger tracing. See https://www.jaegertracing.io/docs/1.9/getting-started/ +# for quick local setup (which this config will send data to). + +# tracing: +# backend: jaeger diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index 8ee9377f43..b1a70078a8 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/util/opentracing" "github.com/uber-go/tally" ) @@ -134,7 +135,7 @@ func (e *Engine) ExecuteExpr( defer close(results) req := newRequest(e, params) - defer req.finish() + nodes, edges, err := req.compile(ctx, parser) if err != nil { results <- Query{Err: err} @@ -147,13 +148,16 @@ func (e *Engine) ExecuteExpr( return } - state, err := req.execute(ctx, pp) + state, err := req.generateExecutionState(ctx, pp) // free up resources if err != nil { results <- Query{Err: err} return } + sp, ctx := opentracingutil.StartSpanFromContext(ctx, "executing") + defer sp.Finish() + result := state.resultNode results <- Query{Result: result} diff --git a/src/query/executor/request.go b/src/query/executor/request.go index 9753f6b677..1c7b14121b 100644 --- a/src/query/executor/request.go +++ b/src/query/executor/request.go @@ -23,14 +23,13 @@ package executor import ( "context" "fmt" - "time" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/plan" "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/query/util/opentracing" - "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -65,26 +64,20 @@ func (s State) durationString() string { // Request represents a single request. type Request struct { - engine *Engine - params models.RequestParams - parentSpan *span + engine *Engine + params models.RequestParams } func newRequest(engine *Engine, params models.RequestParams) *Request { - parentSpan := startSpan(engine.metrics.activeHist, engine.metrics.all) - return &Request{ - engine: engine, - params: params, - parentSpan: parentSpan, - } + return &Request{engine: engine, params: params} } func (r *Request) compile(ctx context.Context, parser parser.Parser) (parser.Nodes, parser.Edges, error) { - sp := startSpan(r.engine.metrics.compilingHist, r.engine.metrics.compiling) + sp, ctx := opentracingutil.StartSpanFromContext(ctx, "compile") + defer sp.Finish() // TODO: Change DAG interface to take in a context nodes, edges, err := parser.DAG() if err != nil { - sp.finish(err) return nil, nil, err } @@ -92,15 +85,15 @@ func (r *Request) compile(ctx context.Context, parser parser.Parser) (parser.Nod logging.WithContext(ctx).Info("compiling dag", zap.Any("nodes", nodes), zap.Any("edges", edges)) } - sp.finish(nil) return nodes, edges, nil } func (r *Request) plan(ctx context.Context, nodes parser.Nodes, edges parser.Edges) (plan.PhysicalPlan, error) { - sp := startSpan(r.engine.metrics.planningHist, r.engine.metrics.planning) + sp, ctx := opentracingutil.StartSpanFromContext(ctx, "plan") + defer sp.Finish() + lp, err := plan.NewLogicalPlan(nodes, edges) if err != nil { - sp.finish(err) return plan.PhysicalPlan{}, err } @@ -110,7 +103,6 @@ func (r *Request) plan(ctx context.Context, nodes parser.Nodes, edges parser.Edg pp, err := plan.NewPhysicalPlan(lp, r.engine.store, r.params, r.engine.lookbackDuration) if err != nil { - sp.finish(err) return plan.PhysicalPlan{}, err } @@ -118,16 +110,17 @@ func (r *Request) plan(ctx context.Context, nodes parser.Nodes, edges parser.Edg logging.WithContext(ctx).Info("physical plan", zap.String("plan", pp.String())) } - sp.finish(nil) return pp, nil } -func (r *Request) execute(ctx context.Context, pp plan.PhysicalPlan) (*ExecutionState, error) { - sp := startSpan(r.engine.metrics.executingHist, r.engine.metrics.executing) +func (r *Request) generateExecutionState(ctx context.Context, pp plan.PhysicalPlan) (*ExecutionState, error) { + sp, ctx := opentracingutil.StartSpanFromContext(ctx, + "generate_execution_state") + defer sp.Finish() + state, err := GenerateExecutionState(pp, r.engine.store) // free up resources if err != nil { - sp.finish(err) return nil, err } @@ -135,38 +128,5 @@ func (r *Request) execute(ctx context.Context, pp plan.PhysicalPlan) (*Execution logging.WithContext(ctx).Info("execution state", zap.String("state", state.String())) } - sp.finish(nil) return state, nil } - -func (r *Request) finish() { - r.parentSpan.finish(nil) -} - -// span is a simple wrapper around opentracing.Span in order to -// get access to the duration of the span for metrics reporting. -type span struct { - start time.Time - durationHist tally.Histogram - counter *counterWithDecrement -} - -func startSpan(durationHist tally.Histogram, counter *counterWithDecrement) *span { - now := time.Now() - counter.Inc() - return &span{ - durationHist: durationHist, - start: now, - counter: counter, - } -} - -func (s *span) finish(err error) { - s.counter.Dec() - // Don't record duration for error cases - if err == nil { - now := time.Now() - duration := now.Sub(s.start) - s.durationHist.RecordDuration(duration) - } -} diff --git a/src/query/executor/transform/controller.go b/src/query/executor/transform/controller.go index 7fdadbd8cf..12efded782 100644 --- a/src/query/executor/transform/controller.go +++ b/src/query/executor/transform/controller.go @@ -40,8 +40,7 @@ func (t *Controller) AddTransform(node OpNode) { // Process performs processing on the underlying transforms func (t *Controller) Process(queryCtx *models.QueryContext, block block.Block) error { for _, ts := range t.transforms { - err := ts.Process(queryCtx, t.ID, block) - if err != nil { + if err := ts.Process(queryCtx, t.ID, block); err != nil { return err } } diff --git a/src/query/executor/transform/exec.go b/src/query/executor/transform/exec.go new file mode 100644 index 0000000000..eb09cbf032 --- /dev/null +++ b/src/query/executor/transform/exec.go @@ -0,0 +1,54 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package transform + +import ( + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/util/opentracing" +) + +// simpleOpNode defines the contract for OpNode instances which +type simpleOpNode interface { + Params() parser.Params + ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) +} + +// ProcessSimpleBlock is a utility for OpNode instances which on receiving a block, process and propagate it immediately +// (as opposed to nodes which e.g. depend on multiple blocks). +// It adds instrumentation to the processing, and handles propagating the block downstream. +// OpNode's should call this as their implementation of the Process method: +// +// func (n MyNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { +// return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +// } +func ProcessSimpleBlock(node simpleOpNode, controller *Controller, queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + sp, ctx := opentracingutil.StartSpanFromContext(queryCtx.Ctx, node.Params().OpType()) + nextBlock, err := node.ProcessBlock(queryCtx.WithContext(ctx), ID, b) + sp.Finish() + if err != nil { + return err + } + + defer nextBlock.Close() + return controller.Process(queryCtx, nextBlock) +} diff --git a/src/query/executor/transform/exec_mock.go b/src/query/executor/transform/exec_mock.go new file mode 100644 index 0000000000..db9dcd99b9 --- /dev/null +++ b/src/query/executor/transform/exec_mock.go @@ -0,0 +1,87 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/query/executor/transform/exec.go + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package transform is a generated GoMock package. +package transform + +import ( + "reflect" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + + "github.com/golang/mock/gomock" +) + +// MocksimpleOpNode is a mock of simpleOpNode interface +type MocksimpleOpNode struct { + ctrl *gomock.Controller + recorder *MocksimpleOpNodeMockRecorder +} + +// MocksimpleOpNodeMockRecorder is the mock recorder for MocksimpleOpNode +type MocksimpleOpNodeMockRecorder struct { + mock *MocksimpleOpNode +} + +// NewMocksimpleOpNode creates a new mock instance +func NewMocksimpleOpNode(ctrl *gomock.Controller) *MocksimpleOpNode { + mock := &MocksimpleOpNode{ctrl: ctrl} + mock.recorder = &MocksimpleOpNodeMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MocksimpleOpNode) EXPECT() *MocksimpleOpNodeMockRecorder { + return m.recorder +} + +// Params mocks base method +func (m *MocksimpleOpNode) Params() parser.Params { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Params") + ret0, _ := ret[0].(parser.Params) + return ret0 +} + +// Params indicates an expected call of Params +func (mr *MocksimpleOpNodeMockRecorder) Params() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Params", reflect.TypeOf((*MocksimpleOpNode)(nil).Params)) +} + +// ProcessBlock mocks base method +func (m *MocksimpleOpNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessBlock", queryCtx, ID, b) + ret0, _ := ret[0].(block.Block) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ProcessBlock indicates an expected call of ProcessBlock +func (mr *MocksimpleOpNodeMockRecorder) ProcessBlock(queryCtx, ID, b interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessBlock", reflect.TypeOf((*MocksimpleOpNode)(nil).ProcessBlock), queryCtx, ID, b) +} diff --git a/src/query/executor/transform/exec_test.go b/src/query/executor/transform/exec_test.go new file mode 100644 index 0000000000..861a0fb2b2 --- /dev/null +++ b/src/query/executor/transform/exec_test.go @@ -0,0 +1,124 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package transform + +import ( + "context" + "errors" + "testing" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + + "github.com/golang/mock/gomock" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessSimpleBlock(t *testing.T) { + type testContext struct { + MockCtrl *gomock.Controller + Controller *Controller + ChildNode *MockOpNode + Node *MocksimpleOpNode + ResultBlock *block.MockBlock + SourceBlock block.Block + QueryCtx *models.QueryContext + } + + setup := func(t *testing.T) (*testContext, func()) { + ctrl := gomock.NewController(t) + + controller := &Controller{ + ID: parser.NodeID("foo"), + } + child := NewMockOpNode(ctrl) + controller.AddTransform(child) + + return &testContext{ + MockCtrl: ctrl, + Controller: controller, + SourceBlock: test.NewBlockFromValues( + models.Bounds{}, [][]float64{{1.0}}), + ResultBlock: block.NewMockBlock(ctrl), + Node: NewMocksimpleOpNode(ctrl), + ChildNode: child, + QueryCtx: models.NoopQueryContext(), + }, ctrl.Finish + } + + doCall := func(tctx *testContext) error { + return ProcessSimpleBlock(tctx.Node, tctx.Controller, tctx.QueryCtx, tctx.Controller.ID, tctx.SourceBlock) + } + + configureSuccessfulNode := func(tctx *testContext) { + tctx.Node.EXPECT().Params().Return(utils.StaticParams("foo")) + tctx.Node.EXPECT().ProcessBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(tctx.ResultBlock, nil) + tctx.ChildNode.EXPECT().Process(gomock.Any(), gomock.Any(), gomock.Any()) + tctx.ResultBlock.EXPECT().Close() + } + + t.Run("closes next block", func(t *testing.T) { + tctx, closer := setup(t) + defer closer() + + configureSuccessfulNode(tctx) + + require.NoError(t, doCall(tctx)) + }) + + t.Run("errors on process error", func(t *testing.T) { + tctx, closer := setup(t) + defer closer() + + expectedErr := errors.New("test err") + tctx.Node.EXPECT().Params().Return(utils.StaticParams("foo")) + tctx.Node.EXPECT().ProcessBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, expectedErr) + + require.EqualError(t, doCall(tctx), expectedErr.Error()) + }) + + t.Run("starts span with op type", func(t *testing.T) { + tctx, closer := setup(t) + defer closer() + + configureSuccessfulNode(tctx) + tctx.Node.EXPECT().Params().Return(utils.StaticParams("foo")) + + mtr := mocktracer.New() + + sp := mtr.StartSpan("root") + tctx.QueryCtx.Ctx = opentracing.ContextWithSpan(context.Background(), sp) + + require.NoError(t, doCall(tctx)) + sp.Finish() + + spans := mtr.FinishedSpans() + + require.Len(t, spans, 2) + assert.Equal(t, tctx.Node.Params().OpType(), spans[0].OperationName) + }) +} diff --git a/src/query/executor/transform/lazy_test.go b/src/query/executor/transform/lazy_test.go index 96db1b9c24..c6819344fc 100644 --- a/src/query/executor/transform/lazy_test.go +++ b/src/query/executor/transform/lazy_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/functions/utils" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" @@ -36,6 +37,10 @@ type dummyFunc struct { controller *Controller } +func (f *dummyFunc) Params() parser.Params { + return utils.StaticParams("dummy") +} + func (f *dummyFunc) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { f.processed = true f.controller.Process(queryCtx, block) diff --git a/src/query/executor/transform/types_mock.go b/src/query/executor/transform/types_mock.go new file mode 100644 index 0000000000..bdcc858a84 --- /dev/null +++ b/src/query/executor/transform/types_mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/query/executor/transform (interfaces: OpNode) + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package transform is a generated GoMock package. +package transform + +import ( + "reflect" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + + "github.com/golang/mock/gomock" +) + +// MockOpNode is a mock of OpNode interface +type MockOpNode struct { + ctrl *gomock.Controller + recorder *MockOpNodeMockRecorder +} + +// MockOpNodeMockRecorder is the mock recorder for MockOpNode +type MockOpNodeMockRecorder struct { + mock *MockOpNode +} + +// NewMockOpNode creates a new mock instance +func NewMockOpNode(ctrl *gomock.Controller) *MockOpNode { + mock := &MockOpNode{ctrl: ctrl} + mock.recorder = &MockOpNodeMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockOpNode) EXPECT() *MockOpNodeMockRecorder { + return m.recorder +} + +// Process mocks base method +func (m *MockOpNode) Process(arg0 *models.QueryContext, arg1 parser.NodeID, arg2 block.Block) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Process", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Process indicates an expected call of Process +func (mr *MockOpNodeMockRecorder) Process(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockOpNode)(nil).Process), arg0, arg1, arg2) +} diff --git a/src/query/functions/aggregation/base.go b/src/query/functions/aggregation/base.go index 4abcf0639f..ed966f1d39 100644 --- a/src/query/functions/aggregation/base.go +++ b/src/query/functions/aggregation/base.go @@ -109,11 +109,20 @@ type baseNode struct { controller *transform.Controller } +func (n *baseNode) Params() parser.Params { + return n.op +} + // Process the block func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +// ProcessBlock performs the aggregation on the input block, and returns the aggregated result. +func (n *baseNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { stepIter, err := b.StepIter() if err != nil { - return err + return nil, err } params := n.op.params @@ -129,11 +138,11 @@ func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl builder, err := n.controller.BlockBuilder(queryCtx, meta, metas) if err != nil { - return err + return nil, err } - if err = builder.AddCols(stepIter.StepCount()); err != nil { - return err + if err := builder.AddCols(stepIter.StepCount()); err != nil { + return nil, err } aggregatedValues := make([]float64, len(buckets)) @@ -148,10 +157,8 @@ func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl } if err = stepIter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - return n.controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } diff --git a/src/query/functions/aggregation/count_values.go b/src/query/functions/aggregation/count_values.go index 518b11157a..569a034cb9 100644 --- a/src/query/functions/aggregation/count_values.go +++ b/src/query/functions/aggregation/count_values.go @@ -88,6 +88,10 @@ type countValuesNode struct { controller *transform.Controller } +func (n *countValuesNode) Params() parser.Params { + return n.op +} + // bucketColumn represents a column of times a particular value in a series has // been seen. This may expand as more unique values are seen type bucketColumn []float64 @@ -141,9 +145,13 @@ func processBlockBucketAtColumn( // Process the block func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +func (n *countValuesNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { stepIter, err := b.StepIter() if err != nil { - return err + return nil, err } params := n.op.params @@ -177,7 +185,7 @@ func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeI } if err = stepIter.Err(); err != nil { - return err + return nil, err } numSeries := 0 @@ -212,11 +220,11 @@ func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeI builder, err := n.controller.BlockBuilder(queryCtx, meta, flattenedMeta) if err != nil { - return err + return nil, err } if err := builder.AddCols(stepCount); err != nil { - return err + return nil, err } for columnIndex := 0; columnIndex < stepCount; columnIndex++ { @@ -229,9 +237,7 @@ func (n *countValuesNode) Process(queryCtx *models.QueryContext, ID parser.NodeI } } - nextBlock := builder.Build() - defer nextBlock.Close() - return n.controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } // pads vals with enough NaNs to match size diff --git a/src/query/functions/aggregation/take.go b/src/query/functions/aggregation/take.go index 49700b58dd..5e5351e45a 100644 --- a/src/query/functions/aggregation/take.go +++ b/src/query/functions/aggregation/take.go @@ -111,11 +111,19 @@ type takeNode struct { controller *transform.Controller } +func (n *takeNode) Params() parser.Params { + return n.op +} + // Process the block func (n *takeNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { stepIter, err := b.StepIter() if err != nil { - return err + return nil, err } params := n.op.params @@ -131,11 +139,11 @@ func (n *takeNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl // retain original metadatas builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta()) if err != nil { - return err + return nil, err } if err = builder.AddCols(stepIter.StepCount()); err != nil { - return err + return nil, err } for index := 0; stepIter.Next(); index++ { @@ -146,12 +154,10 @@ func (n *takeNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl } if err = stepIter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - return n.controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } // shortcut to return empty when taking <= 0 values diff --git a/src/query/functions/binary/base.go b/src/query/functions/binary/base.go index f8b36f537d..089abf31ee 100644 --- a/src/query/functions/binary/base.go +++ b/src/query/functions/binary/base.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/util/opentracing" ) type baseOp struct { @@ -115,6 +116,10 @@ type baseNode struct { mu sync.Mutex } +func (n *baseNode) Params() parser.Params { + return n.op +} + type processFunc func(*models.QueryContext, block.Block, block.Block, *transform.Controller) (block.Block, error) // Process processes a block @@ -133,7 +138,7 @@ func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl n.cleanup() - nextBlock, err := n.process(queryCtx, lhs, rhs, n.controller) + nextBlock, err := n.processWithTracing(queryCtx, lhs, rhs) if err != nil { return err } @@ -142,6 +147,14 @@ func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl return n.controller.Process(queryCtx, nextBlock) } +func (n *baseNode) processWithTracing(queryCtx *models.QueryContext, lhs block.Block, rhs block.Block) (block.Block, error) { + sp, ctx := opentracingutil.StartSpanFromContext(queryCtx.Ctx, n.op.OpType()) + defer sp.Finish() + queryCtx = queryCtx.WithContext(ctx) + + return n.process(queryCtx, lhs, rhs, n.controller) +} + // computeOrCache figures out if both lhs and rhs are available, if not then it caches the incoming block func (n *baseNode) computeOrCache(ID parser.NodeID, b block.Block) (block.Block, block.Block, error) { var lhs, rhs block.Block diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index b5f04ca796..5778438087 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -24,11 +24,13 @@ import ( "fmt" "time" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/query/util/opentracing" "go.uber.org/zap" ) @@ -86,23 +88,31 @@ func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage, } } -// Execute runs the fetch node operation -func (n *FetchNode) Execute(queryCtx *models.QueryContext) error { +func (n *FetchNode) fetch(queryCtx *models.QueryContext) (block.Result, error) { + ctx := queryCtx.Ctx + sp, ctx := opentracingutil.StartSpanFromContext(ctx, "fetch") + defer sp.Finish() + timeSpec := n.timespec // No need to adjust start and ends since physical plan already considers the offset, range startTime := timeSpec.Start endTime := timeSpec.End + opts := storage.NewFetchOptions() opts.BlockType = n.blockType - ctx := queryCtx.Ctx - - blockResult, err := n.storage.FetchBlocks(ctx, &storage.FetchQuery{ + return n.storage.FetchBlocks(ctx, &storage.FetchQuery{ Start: startTime, End: endTime, TagMatchers: n.op.Matchers, Interval: timeSpec.Step, }, opts) +} + +// Execute runs the fetch node operation +func (n *FetchNode) Execute(queryCtx *models.QueryContext) error { + ctx := queryCtx.Ctx + blockResult, err := n.fetch(queryCtx) if err != nil { return err } diff --git a/src/query/functions/linear/base.go b/src/query/functions/linear/base.go index 437adecea5..778c572083 100644 --- a/src/query/functions/linear/base.go +++ b/src/query/functions/linear/base.go @@ -64,6 +64,10 @@ type baseNode struct { processor Processor } +func (c *baseNode) Params() parser.Params { + return c.op +} + // Ensure baseNode implements the types for lazy evaluation var _ transform.StepNode = (*baseNode)(nil) var _ transform.SeriesNode = (*baseNode)(nil) @@ -82,18 +86,23 @@ func (c *baseNode) ProcessSeries(series block.Series) (block.Series, error) { // Process the block func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(c, c.controller, queryCtx, ID, b) +} + +// ProcessBlock applies the linear function time Step-wise to each value in the block. +func (c *baseNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { stepIter, err := b.StepIter() if err != nil { - return err + return nil, err } builder, err := c.controller.BlockBuilder(queryCtx, stepIter.Meta(), stepIter.SeriesMeta()) if err != nil { - return err + return nil, err } - if err = builder.AddCols(stepIter.StepCount()); err != nil { - return err + if err := builder.AddCols(stepIter.StepCount()); err != nil { + return nil, err } for index := 0; stepIter.Next(); index++ { @@ -105,12 +114,10 @@ func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl } if err = stepIter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - return c.controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } // Meta returns the metadata for the block diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index 813adfea7b..242d60cbe9 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -232,11 +232,21 @@ func bucketQuantile(q float64, buckets []bucketValue) float64 { return bucketStart + (bucketEnd-bucketStart)*rank/count } +func (n *histogramQuantileNode) Params() parser.Params { + return n.op +} + // Process the block func (n *histogramQuantileNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +func (n *histogramQuantileNode) ProcessBlock(queryCtx *models.QueryContext, + ID parser.NodeID, + b block.Block) (block.Block, error) { stepIter, err := b.StepIter() if err != nil { - return err + return nil, err } meta := stepIter.Meta() @@ -288,12 +298,12 @@ func processValidQuantile( meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, -) error { +) (block.Block, error) { sanitizeBuckets(bucketedSeries) builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) if err != nil { - return err + return nil, err } for index := 0; stepIter.Next(); index++ { @@ -328,12 +338,10 @@ func processValidQuantile( } if err = stepIter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - return controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } func processInvalidQuantile( @@ -343,10 +351,10 @@ func processInvalidQuantile( meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, -) error { +) (block.Block, error) { builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) if err != nil { - return err + return nil, err } // Set the values to an infinity of the appropriate sign; anything less than 0 @@ -364,10 +372,8 @@ func processInvalidQuantile( } if err = stepIter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - return controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } diff --git a/src/query/functions/tag/base.go b/src/query/functions/tag/base.go index 3cde9c0140..93daa3a506 100644 --- a/src/query/functions/tag/base.go +++ b/src/query/functions/tag/base.go @@ -97,22 +97,25 @@ type baseNode struct { controller *transform.Controller } +func (n *baseNode) Params() parser.Params { + return n.op +} + // Process the block. func (n *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +func (n *baseNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { + it, err := b.StepIter() if err != nil { - return err + return nil, err } meta := it.Meta() seriesMeta := it.SeriesMeta() meta, seriesMeta = n.op.tagFn(meta, seriesMeta) - bl, err := b.WithMetadata(meta, seriesMeta) - if err != nil { - return err - } - - defer bl.Close() - return n.controller.Process(queryCtx, bl) + return b.WithMetadata(meta, seriesMeta) } diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index a78611b32f..132e2f6d39 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/query/util/opentracing" "go.uber.org/zap" ) @@ -192,7 +193,20 @@ func (c *baseNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b bl } } - return c.processCompletedBlocks(processRequests, maxBlocks) + blocks, err := c.processCompletedBlocks(queryCtx, processRequests, maxBlocks) + defer closeBlocks(blocks) + + if err != nil { + return err + } + + return c.propagateNextBlocks(processRequests, blocks, maxBlocks) +} + +func closeBlocks(blocks []block.Block) { + for _, bl := range blocks { + bl.Close() + } } // processCurrent processes the current block. For the current block, figure out whether we have enough previous blocks which can help process it @@ -216,11 +230,13 @@ func (c *baseNode) processRight(bounds models.Bounds, rightRangeStart models.Bou return rightBlks, len(rightBlks) != numBlocks, nil } -// processCompletedBlocks processes all blocks for which all dependent blocks are present -func (c *baseNode) processCompletedBlocks(processRequests []processRequest, maxBlocks int) error { +func (c *baseNode) propagateNextBlocks(processRequests []processRequest, blocks []block.Block, maxBlocks int) error { processedKeys := make([]time.Time, len(processRequests)) - for i, req := range processRequests { - if err := c.processSingleRequest(req); err != nil { + + // propagate blocks downstream + for i, nextBlock := range blocks { + req := processRequests[i] + if err := c.controller.Process(req.queryCtx, nextBlock); err != nil { return err } @@ -235,17 +251,36 @@ func (c *baseNode) processCompletedBlocks(processRequests []processRequest, maxB return nil } -func (c *baseNode) processSingleRequest(request processRequest) error { +// processCompletedBlocks processes all blocks for which all dependent blocks are present +func (c *baseNode) processCompletedBlocks(queryCtx *models.QueryContext, processRequests []processRequest, maxBlocks int) ([]block.Block, error) { + sp, _ := opentracingutil.StartSpanFromContext(queryCtx.Ctx, c.op.OpType()) + defer sp.Finish() + + blocks := make([]block.Block, len(processRequests)) + for i, req := range processRequests { + bl, err := c.processSingleRequest(req) + if err != nil { + // return processed blocks so we can close them + return blocks, err + } + + blocks[i] = bl + } + + return blocks, nil +} + +func (c *baseNode) processSingleRequest(request processRequest) (block.Block, error) { seriesIter, err := request.blk.SeriesIter() if err != nil { - return err + return nil, err } depIters := make([]block.UnconsolidatedSeriesIter, len(request.deps)) for i, blk := range request.deps { iter, err := blk.SeriesIter() if err != nil { - return err + return nil, err } depIters[i] = iter @@ -268,11 +303,11 @@ func (c *baseNode) processSingleRequest(request processRequest) error { builder, err := c.controller.BlockBuilder(request.queryCtx, seriesIter.Meta(), resultSeriesMeta) if err != nil { - return err + return nil, err } if err := builder.AddCols(bounds.Steps()); err != nil { - return err + return nil, err } aggDuration := c.op.duration @@ -283,7 +318,7 @@ func (c *baseNode) processSingleRequest(request processRequest) error { values = values[:0] for i, iter := range depIters { if !iter.Next() { - return fmt.Errorf("incorrect number of series for block: %d", i) + return nil, fmt.Errorf("incorrect number of series for block: %d", i) } s := iter.Current() @@ -325,12 +360,10 @@ func (c *baseNode) processSingleRequest(request processRequest) error { } if err = seriesIter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - return c.controller.Process(request.queryCtx, nextBlock) + return builder.Build(), nil } func (c *baseNode) sweep(processedKeys []bool, maxBlocks int) { diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index 785187cb05..59cfd23b98 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -396,13 +396,16 @@ func TestSingleProcessRequest(t *testing.T) { }, }) bNode := node.(*baseNode) - err := bNode.processSingleRequest(processRequest{ + request := processRequest{ blk: block2, bounds: bounds, deps: []block.UnconsolidatedBlock{block1}, queryCtx: models.NoopQueryContext(), - }) - assert.NoError(t, err) + } + bl, err := bNode.processSingleRequest(request) + require.NoError(t, err) + + bNode.propagateNextBlocks([]processRequest{request}, []block.Block{bl}, 1) assert.Len(t, sink.Values, 2, "block processed") // Current Block: 0 1 2 3 4 5 // Previous Block: 10 11 12 13 14 15 diff --git a/src/query/functions/unconsolidated/timestamp.go b/src/query/functions/unconsolidated/timestamp.go index 052bc47385..a8eb0a49d1 100644 --- a/src/query/functions/unconsolidated/timestamp.go +++ b/src/query/functions/unconsolidated/timestamp.go @@ -83,25 +83,33 @@ type timestampNode struct { controller *transform.Controller } +func (n *timestampNode) Params() parser.Params { + return n.op +} + // Process the block func (n *timestampNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +func (n *timestampNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { unconsolidatedBlock, err := b.Unconsolidated() if err != nil { - return err + return nil, err } iter, err := unconsolidatedBlock.StepIter() if err != nil { - return err + return nil, err } builder, err := n.controller.BlockBuilder(queryCtx, iter.Meta(), iter.SeriesMeta()) if err != nil { - return err + return nil, err } if err = builder.AddCols(iter.StepCount()); err != nil { - return err + return nil, err } for index := 0; iter.Next(); index++ { @@ -122,11 +130,8 @@ func (n *timestampNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, } if err = iter.Err(); err != nil { - return err + return nil, err } - nextBlock := builder.Build() - defer nextBlock.Close() - - return n.controller.Process(queryCtx, nextBlock) + return builder.Build(), nil } diff --git a/src/query/functions/utils/params.go b/src/query/functions/utils/params.go new file mode 100644 index 0000000000..5002b2c991 --- /dev/null +++ b/src/query/functions/utils/params.go @@ -0,0 +1,34 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package utils + +// StaticParams is a simple string Params implementation, useful for when no data other than OpType is needed. +type StaticParams string + +// String simply returns s. +func (s StaticParams) String() string { + return string(s) +} + +// OpType simply returns s for StaticParams. +func (s StaticParams) OpType() string { + return string(s) +} diff --git a/src/query/generated/mocks/generate.go b/src/query/generated/mocks/generate.go index d5f88dfec6..15d73cf638 100644 --- a/src/query/generated/mocks/generate.go +++ b/src/query/generated/mocks/generate.go @@ -22,8 +22,10 @@ //go:generate sh -c "mockgen -package=downsample $PACKAGE/src/cmd/services/m3coordinator/downsample Downsampler,MetricsAppender,SamplesAppender | genclean -pkg $PACKAGE/src/cmd/services/m3coordinator/downsample -out $GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/downsample/downsample_mock.go" //go:generate sh -c "mockgen -package=block -destination=$GOPATH/src/$PACKAGE/src/query/block/block_mock.go $PACKAGE/src/query/block Block,StepIter,SeriesIter,Builder,Step" //go:generate sh -c "mockgen -package=ingest -destination=$GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/ingest/write_mock.go $PACKAGE/src/cmd/services/m3coordinator/ingest DownsamplerAndWriter" +//go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/$PACKAGE/src/query/executor/transform/types_mock.go $PACKAGE/src/query/executor/transform OpNode" // mockgen rules for generating mocks for unexported interfaces (file mode). //go:generate sh -c "mockgen -package=m3ql -destination=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types.go" +//go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/github.com/m3db/m3/src/query/executor/transform/exec_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/executor/transform/exec.go" package mocks diff --git a/src/query/server/server.go b/src/query/server/server.go index f728978d5a..81eecf661b 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -64,12 +64,17 @@ import ( xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/uber-go/tally" "go.uber.org/zap" "google.golang.org/grpc" ) +const ( + serviceName = "m3query" +) + var ( defaultLocalConfiguration = &config.LocalConfiguration{ Namespaces: []m3.ClusterStaticNamespaceConfiguration{ @@ -134,9 +139,24 @@ func Run(runOpts RunOptions) { if err != nil { logger.Fatal("could not connect to metrics", zap.Any("error", err)) } + + tracer, traceCloser, err := cfg.Tracing.NewTracer(serviceName, scope, logger) + if err != nil { + logger.Fatal("could not initialize tracing", zap.Error(err)) + } + + defer traceCloser.Close() + + if _, ok := tracer.(opentracing.NoopTracer); ok { + logger.Info("tracing disabled; set `tracing.backend` to enable") + } + instrumentOptions := instrument.NewOptions(). SetMetricsScope(scope). - SetZapLogger(logger) + SetZapLogger(logger). + SetTracer(tracer) + + opentracing.SetGlobalTracer(tracer) // Close metrics scope defer func() { diff --git a/src/query/util/httperrors/errors.go b/src/query/util/httperrors/errors.go new file mode 100644 index 0000000000..dedd78a0e8 --- /dev/null +++ b/src/query/util/httperrors/errors.go @@ -0,0 +1,50 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package httperrors + +import ( + "net/http" + + "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/x/net/http" +) + +type errorWithID struct { + xhttp.ErrorResponse + RqID string `json:"rqID"` +} + +// ErrorWithReqInfo writes an xhttp.ErrorResponse with an added request id ( +// RqId) field read from the request context. +// +// NB: RqID is currently a query specific concept, +// which is why this doesn't exist in xhttp proper. +// We can add it later if we propagate the request id concept to that package as well. +func ErrorWithReqInfo(w http.ResponseWriter, r *http.Request, code int, err error) { + ctx := r.Context() + w.WriteHeader(code) + xhttp.WriteJSONResponse(w, errorWithID{ + ErrorResponse: xhttp.ErrorResponse{ + Error: err.Error(), + }, + RqID: logging.ReadContextID(ctx), + }, logging.WithContext(ctx)) +} diff --git a/src/query/util/logging/log.go b/src/query/util/logging/log.go index 19e9885612..d48a8cba2b 100644 --- a/src/query/util/logging/log.go +++ b/src/query/util/logging/log.go @@ -30,6 +30,7 @@ import ( xhttp "github.com/m3db/m3/src/x/net/http" + "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -129,6 +130,12 @@ func withResponseTimeLoggingFunc( rqCtx := NewContextWithGeneratedID(r.Context()) logger := WithContext(rqCtx) + sp := opentracing.SpanFromContext(rqCtx) + if sp != nil { + rqID := ReadContextID(rqCtx) + sp.SetTag("rqID", rqID) + } + // Propagate the context with the reqId next(w, r.WithContext(rqCtx)) endTime := time.Now() diff --git a/src/query/util/opentracing/context.go b/src/query/util/opentracing/context.go new file mode 100644 index 0000000000..3a8544ebde --- /dev/null +++ b/src/query/util/opentracing/context.go @@ -0,0 +1,81 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package opentracingutil provides more utilities for dealing with opentracing and context.Context's. +package opentracingutil + +import ( + "context" + "fmt" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" +) + +// alias GlobalTracer() so we can mock out the tracer without impacting tests outside the package +var getGlobalTracer = opentracing.GlobalTracer + +// SpanFromContextOrNoop is the same as opentracing.SpanFromContext, +// but instead of returning nil, +// it returns a NoopTracer span if ctx doesn't already have an associated span. +// Use this over opentracing.StartSpanFromContext if you need access to the +// current span, (e.g. if you don't want to start a child span). +// +// NB: if there is no span in the context, the span returned by this function +// is a noop, and won't be attached to the context; if you +// want a proper span, either start one and pass it in, or start one +// in your function. +func SpanFromContextOrNoop(ctx context.Context) opentracing.Span { + sp := opentracing.SpanFromContext(ctx) + if sp != nil { + return sp + } + + return opentracing.NoopTracer{}.StartSpan("") +} + +// StartSpanFromContext is the same as opentracing.StartSpanFromContext, but instead of always using the global tracer, +// it attempts to use the parent span's tracer if it's available. This behavior is (arguably) more flexible--it allows +// a locally set tracer to be used when needed (as in tests)--while being equivalent to the original in most contexts. +// See https://github.com/opentracing/opentracing-go/issues/149 for more discussion. +func StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + var tracer opentracing.Tracer + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + opts = append(opts, opentracing.ChildOf(parentSpan.Context())) + tracer = parentSpan.Tracer() + } else { + tracer = getGlobalTracer() + } + + span := tracer.StartSpan(operationName, opts...) + + return span, opentracing.ContextWithSpan(ctx, span) +} + +// Time is a log.Field for time.Time values. It translates to RF3339 formatted time strings. +func Time(key string, t time.Time) log.Field { + return log.String(key, t.Format(time.RFC3339)) +} + +// Duration is a log.Field for Duration values. It translates to the standard Go duration format (Duration.String()). +func Duration(key string, t time.Duration) log.Field { + return log.String(key, fmt.Sprint(t)) +} diff --git a/src/query/util/opentracing/context_test.go b/src/query/util/opentracing/context_test.go new file mode 100644 index 0000000000..bcab2ded54 --- /dev/null +++ b/src/query/util/opentracing/context_test.go @@ -0,0 +1,104 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package opentracingutil + +import ( + "context" + "testing" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStartSpanFromContext(t *testing.T) { + t.Run("uses local tracer if available", func(t *testing.T) { + mtr := mocktracer.New() + defer mockoutGlobalTracer(opentracing.NoopTracer{})() + + rootSp := mtr.StartSpan("root") + ctx := opentracing.ContextWithSpan(context.Background(), rootSp) + + childSp, ctx := StartSpanFromContext(ctx, "child") + + rootSp.Finish() + childSp.Finish() + + assertHasSpans(t, mtr, []string{"root", "child"}) + + ctxSpan := opentracing.SpanFromContext(ctx) + require.IsType(t, (*mocktracer.MockSpan)(nil), ctxSpan) + assert.Equal(t, ctxSpan.(*mocktracer.MockSpan).OperationName, "child", + "should set span on context to child span") + + }) + + t.Run("uses global tracer if no parent span", func(t *testing.T) { + mtr := mocktracer.New() + defer mockoutGlobalTracer(mtr)() + + sp, ctx := StartSpanFromContext(context.Background(), "foo") + sp.Finish() + + assertHasSpans(t, mtr, []string{"foo"}) + assert.NotNil(t, opentracing.SpanFromContext(ctx)) + }) +} + +func TestStartSpanFromContextOrRoot(t *testing.T) { + t.Run("uses noop tracer if nothing passed in", func(t *testing.T) { + assert.Equal(t, opentracing.NoopTracer{}.StartSpan(""), + SpanFromContextOrNoop(context. + Background())) + }) + + t.Run("returns span if span attached to context", func(t *testing.T) { + mt := mocktracer.New() + root := mt.StartSpan("root") + ctx := opentracing.ContextWithSpan(context.Background(), root) + + assert.Equal(t, root, SpanFromContextOrNoop(ctx)) + }) + +} + +func mockoutGlobalTracer(mtr opentracing.Tracer) func() { + oldGGT := getGlobalTracer + getGlobalTracer = func() opentracing.Tracer { + return mtr + } + + return func() { + getGlobalTracer = oldGGT + } +} + +func assertHasSpans(t *testing.T, mtr *mocktracer.MockTracer, opNames []string) { + spans := mtr.FinishedSpans() + actualOpNames := make([]string, len(spans)) + + for i, sp := range spans { + actualOpNames[i] = sp.OperationName + } + + assert.Equal(t, opNames, actualOpNames) +} diff --git a/src/x/net/http/errors.go b/src/x/net/http/errors.go index 803f11813a..cba98469d9 100644 --- a/src/x/net/http/errors.go +++ b/src/x/net/http/errors.go @@ -33,14 +33,15 @@ var ( ErrInvalidParams = errors.New("invalid request params") ) -type errorResponse struct { +// ErrorResponse is a generic response for an HTTP error. +type ErrorResponse struct { Error string `json:"error"` } // Error will serve an HTTP error func Error(w http.ResponseWriter, err error, code int) { w.WriteHeader(code) - json.NewEncoder(w).Encode(errorResponse{Error: err.Error()}) + json.NewEncoder(w).Encode(ErrorResponse{Error: err.Error()}) } // ParseError is the error from parsing requests diff --git a/src/x/net/http/response_test.go b/src/x/net/http/response_test.go index 94e1fc4800..bde1efd70c 100644 --- a/src/x/net/http/response_test.go +++ b/src/x/net/http/response_test.go @@ -64,7 +64,7 @@ func assertWroteJSONError( code int, ) { assert.Equal(t, code, recorder.Code) - var resp errorResponse + var resp ErrorResponse err := json.NewDecoder(recorder.Body).Decode(&resp) require.NoError(t, err) assert.NotEqual(t, "", resp.Error)