From 69e69d52cd1b8d2651bcffd643c70177e19c2bdf Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 11 Nov 2020 15:16:58 +0200 Subject: [PATCH] Initial commit for proposed query handlers refactoring: * updated CustomHandler interface * unified how query routes are added. no need to pass router around and wrapping is done in one place. * routes are registered with names so they could be easily found later when custom handlers are registered. --- src/query/api/v1/handler/database/common.go | 17 +- src/query/api/v1/handler/namespace/common.go | 44 ++--- src/query/api/v1/handler/placement/common.go | 51 +++--- src/query/api/v1/handler/topic/common.go | 28 +-- src/query/api/v1/httpd/handler.go | 180 ++++++++++--------- src/query/api/v1/httpd/handler_test.go | 28 ++- src/query/api/v1/options/handler.go | 5 +- 7 files changed, 171 insertions(+), 182 deletions(-) diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index f7960909b9..ce894277ff 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -27,10 +27,7 @@ import ( dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" - - "github.com/gorilla/mux" ) // Handler represents a generic handler for namespace endpoints. @@ -43,31 +40,23 @@ type Handler struct { // RegisterRoutes registers the namespace routes func RegisterRoutes( - r *mux.Router, + addRoute func(path string, handler http.Handler, methods ...string), client clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, ) error { - wrapped := func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) - } - createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg, defaults, instrumentOpts) if err != nil { return err } - r.HandleFunc(CreateURL, - wrapped(createHandler).ServeHTTP). - Methods(CreateHTTPMethod) - // Register the same handler under two different endpoints. This just makes explaining things in // our documentation easier so we can separate out concepts, but share the underlying code. - r.HandleFunc(CreateURL, wrapped(createHandler).ServeHTTP).Methods(CreateHTTPMethod) - r.HandleFunc(CreateNamespaceURL, wrapped(createHandler).ServeHTTP).Methods(CreateNamespaceHTTPMethod) + addRoute(CreateURL, createHandler, CreateHTTPMethod) + addRoute(CreateNamespaceURL, createHandler, CreateNamespaceHTTPMethod) return nil } diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index 4e95e121ef..747bc47786 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -33,11 +33,8 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/storage/m3" - "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" - - "github.com/gorilla/mux" ) const ( @@ -104,15 +101,12 @@ func Metadata(store kv.Store) ([]namespace.Metadata, int, error) { // RegisterRoutes registers the namespace routes. func RegisterRoutes( - r *mux.Router, + addRoute func(path string, handler http.Handler, methods ...string), client clusterclient.Client, clusters m3.Clusters, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, ) { - wrapped := func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) - } applyMiddleware := func( f func(svc handleroptions.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), @@ -128,40 +122,32 @@ func RegisterRoutes( } // Get M3DB namespaces. - getHandler := wrapped( - applyMiddleware(NewGetHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBGetURL, getHandler.ServeHTTP).Methods(GetHTTPMethod) + getHandler := applyMiddleware(NewGetHandler(client, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBGetURL, getHandler, GetHTTPMethod) // Add M3DB namespaces. - addHandler := wrapped( - applyMiddleware(NewAddHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBAddURL, addHandler.ServeHTTP).Methods(AddHTTPMethod) + addHandler := applyMiddleware(NewAddHandler(client, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBAddURL, addHandler, AddHTTPMethod) // Update M3DB namespaces. - updateHandler := wrapped( - applyMiddleware(NewUpdateHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBUpdateURL, updateHandler.ServeHTTP).Methods(UpdateHTTPMethod) + updateHandler := applyMiddleware(NewUpdateHandler(client, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBUpdateURL, updateHandler, UpdateHTTPMethod) // Delete M3DB namespaces. - deleteHandler := wrapped( - applyMiddleware(NewDeleteHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBDeleteURL, deleteHandler.ServeHTTP).Methods(DeleteHTTPMethod) + deleteHandler := applyMiddleware(NewDeleteHandler(client, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBDeleteURL, deleteHandler, DeleteHTTPMethod) // Deploy M3DB schemas. - schemaHandler := wrapped( - applyMiddleware(NewSchemaHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBSchemaURL, schemaHandler.ServeHTTP).Methods(SchemaDeployHTTPMethod) + schemaHandler := applyMiddleware(NewSchemaHandler(client, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBSchemaURL, schemaHandler, SchemaDeployHTTPMethod) // Reset M3DB schemas. - schemaResetHandler := wrapped( - applyMiddleware(NewSchemaResetHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBSchemaURL, schemaResetHandler.ServeHTTP).Methods(DeleteHTTPMethod) + schemaResetHandler := applyMiddleware(NewSchemaResetHandler(client, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBSchemaURL, schemaResetHandler, DeleteHTTPMethod) // Mark M3DB namespace as ready. - readyHandler := wrapped( - applyMiddleware(NewReadyHandler(client, clusters, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBReadyURL, readyHandler.ServeHTTP).Methods(ReadyHTTPMethod) - + readyHandler := applyMiddleware(NewReadyHandler(client, clusters, instrumentOpts).ServeHTTP, defaults) + addRoute(M3DBReadyURL, readyHandler, ReadyHTTPMethod) } func validateNamespaceAggregationOptions(mds []namespace.Metadata) error { diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index 66d732442c..3f49cb2ca6 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -42,8 +42,6 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" - - "github.com/gorilla/mux" ) const ( @@ -222,7 +220,7 @@ func ConvertInstancesProto(instancesProto []*placementpb.Instance) ([]placement. // RegisterRoutes registers the placement routes func RegisterRoutes( - r *mux.Router, + addRoute func(path string, handler http.Handler, methods ...string), defaults []handleroptions.ServiceOptionsDefault, opts HandlerOptions, ) { @@ -231,63 +229,64 @@ func RegisterRoutes( initHandler = NewInitHandler(opts) initFn = applyMiddleware(initHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBInitURL, initFn).Methods(InitHTTPMethod) - r.HandleFunc(M3AggInitURL, initFn).Methods(InitHTTPMethod) - r.HandleFunc(M3CoordinatorInitURL, initFn).Methods(InitHTTPMethod) + + addRoute(M3DBInitURL, initFn, InitHTTPMethod) + addRoute(M3AggInitURL, initFn, InitHTTPMethod) + addRoute(M3CoordinatorInitURL, initFn, InitHTTPMethod) // Get var ( getHandler = NewGetHandler(opts) getFn = applyMiddleware(getHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBGetURL, getFn).Methods(GetHTTPMethod) - r.HandleFunc(M3AggGetURL, getFn).Methods(GetHTTPMethod) - r.HandleFunc(M3CoordinatorGetURL, getFn).Methods(GetHTTPMethod) + addRoute(M3DBGetURL, getFn, GetHTTPMethod) + addRoute(M3AggGetURL, getFn, GetHTTPMethod) + addRoute(M3CoordinatorGetURL, getFn, GetHTTPMethod) // Delete all var ( deleteAllHandler = NewDeleteAllHandler(opts) deleteAllFn = applyMiddleware(deleteAllHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) - r.HandleFunc(M3AggDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) - r.HandleFunc(M3CoordinatorDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) + addRoute(M3DBDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod) + addRoute(M3AggDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod) + addRoute(M3CoordinatorDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod) // Add var ( addHandler = NewAddHandler(opts) addFn = applyMiddleware(addHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBAddURL, addFn).Methods(AddHTTPMethod) - r.HandleFunc(M3AggAddURL, addFn).Methods(AddHTTPMethod) - r.HandleFunc(M3CoordinatorAddURL, addFn).Methods(AddHTTPMethod) + addRoute(M3DBAddURL, addFn, AddHTTPMethod) + addRoute(M3AggAddURL, addFn, AddHTTPMethod) + addRoute(M3CoordinatorAddURL, addFn, AddHTTPMethod) // Delete var ( deleteHandler = NewDeleteHandler(opts) deleteFn = applyMiddleware(deleteHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBDeleteURL, deleteFn).Methods(DeleteHTTPMethod) - r.HandleFunc(M3AggDeleteURL, deleteFn).Methods(DeleteHTTPMethod) - r.HandleFunc(M3CoordinatorDeleteURL, deleteFn).Methods(DeleteHTTPMethod) + addRoute(M3DBDeleteURL, deleteFn, DeleteHTTPMethod) + addRoute(M3AggDeleteURL, deleteFn, DeleteHTTPMethod) + addRoute(M3CoordinatorDeleteURL, deleteFn, DeleteHTTPMethod) // Replace var ( replaceHandler = NewReplaceHandler(opts) replaceFn = applyMiddleware(replaceHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) - r.HandleFunc(M3AggReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) - r.HandleFunc(M3CoordinatorReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) + addRoute(M3DBReplaceURL, replaceFn, ReplaceHTTPMethod) + addRoute(M3AggReplaceURL, replaceFn, ReplaceHTTPMethod) + addRoute(M3CoordinatorReplaceURL, replaceFn, ReplaceHTTPMethod) // Set var ( setHandler = NewSetHandler(opts) setFn = applyMiddleware(setHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBSetURL, setFn).Methods(SetHTTPMethod) - r.HandleFunc(M3AggSetURL, setFn).Methods(SetHTTPMethod) - r.HandleFunc(M3CoordinatorSetURL, setFn).Methods(SetHTTPMethod) + addRoute(M3DBSetURL, setFn, SetHTTPMethod) + addRoute(M3AggSetURL, setFn, SetHTTPMethod) + addRoute(M3CoordinatorSetURL, setFn, SetHTTPMethod) } func newPlacementCutoverNanosFn( @@ -386,11 +385,11 @@ func applyMiddleware( f func(svc handleroptions.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, -) func(w http.ResponseWriter, r *http.Request) { +) http.Handler { return logging.WithResponseTimeAndPanicErrorLoggingFunc( parseServiceMiddleware(f, defaults), instrumentOpts, - ).ServeHTTP + ) } func parseServiceMiddleware( diff --git a/src/query/api/v1/handler/topic/common.go b/src/query/api/v1/handler/topic/common.go index 24112ceb4b..2474876a24 100644 --- a/src/query/api/v1/handler/topic/common.go +++ b/src/query/api/v1/handler/topic/common.go @@ -29,13 +29,11 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" - "github.com/gorilla/mux" ) const ( @@ -71,30 +69,16 @@ func Service(clusterClient clusterclient.Client, opts handleroptions.ServiceOpti // RegisterRoutes registers the topic routes func RegisterRoutes( - r *mux.Router, + addRoute func(path string, handler http.Handler, methods ...string), client clusterclient.Client, cfg config.Configuration, instrumentOpts instrument.Options, ) { - wrapped := func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) - } - - r.HandleFunc(InitURL, - wrapped(newInitHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(InitHTTPMethod) - r.HandleFunc(GetURL, - wrapped(newGetHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(GetHTTPMethod) - r.HandleFunc(AddURL, - wrapped(newAddHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(AddHTTPMethod) - r.HandleFunc(UpdateURL, - wrapped(newUpdateHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(UpdateHTTPMethod) - r.HandleFunc(DeleteURL, - wrapped(newDeleteHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(DeleteHTTPMethod) + addRoute(InitURL, newInitHandler(client, cfg, instrumentOpts), InitHTTPMethod) + addRoute(GetURL, newGetHandler(client, cfg, instrumentOpts), GetHTTPMethod) + addRoute(AddURL, newAddHandler(client, cfg, instrumentOpts), AddHTTPMethod) + addRoute(UpdateURL, newUpdateHandler(client, cfg, instrumentOpts), UpdateHTTPMethod) + addRoute(DeleteURL, newDeleteHandler(client, cfg, instrumentOpts), DeleteHTTPMethod) } func topicName(headers http.Header) string { diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 37ebd072b7..9e37966cc2 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -127,9 +127,10 @@ func applyMiddleware(base *mux.Router, tracer opentracing.Tracer) http.Handler { // RegisterRoutes registers all http routes. func (h *Handler) RegisterRoutes() error { - instrumentOpts := h.options.InstrumentOpts() - // Wrap requests with response time logging as well as panic recovery. var ( + instrumentOpts = h.options.InstrumentOpts() + + // Wrap requests with response time logging as well as panic recovery. wrapped = func(n http.Handler) http.Handler { return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) } @@ -137,13 +138,22 @@ func (h *Handler) RegisterRoutes() error { panicOnly = func(n http.Handler) http.Handler { return logging.WithPanicErrorResponder(n, instrumentOpts) } + + addWrappedRoute = func(path string, handler http.Handler, methods ...string) { + addRoute(h.router, path, wrapped(handler), methods...) + } + + addRoute = func(path string, handler http.Handler, methods ...string) { + addRoute(h.router, path, handler, methods...) + } ) - h.router.HandleFunc(openapi.URL, - wrapped(openapi.NewDocHandler(instrumentOpts)).ServeHTTP, - ).Methods(openapi.HTTPMethod) + addWrappedRoute(openapi.URL, openapi.NewDocHandler(instrumentOpts), openapi.HTTPMethod) + h.router.PathPrefix(openapi.StaticURLPrefix). - Handler(wrapped(openapi.StaticHandler())) + Handler(wrapped(openapi.StaticHandler())). + Name(openapi.StaticURLPrefix) + // Prometheus remote read/write endpoints. remoteSourceOpts := h.options.SetInstrumentOpts(instrumentOpts. @@ -164,24 +174,13 @@ func (h *Handler) RegisterRoutes() error { Tagged(v1APIGroup), )) - // Register custom endpoints. - for _, custom := range h.customHandlers { - handler, err := custom.Handler(nativeSourceOpts) - if err != nil { - return err - } - - h.router.HandleFunc(custom.Route(), handler.ServeHTTP). - Methods(custom.Methods()...) - } - opts := prom.Options{ PromQLEngine: h.options.PrometheusEngine(), } - promqlQueryHandler := wrapped(prom.NewReadHandler(opts, nativeSourceOpts)) - promqlInstantQueryHandler := wrapped(prom.NewReadInstantHandler(opts, nativeSourceOpts)) - nativePromReadHandler := wrapped(native.NewPromReadHandler(nativeSourceOpts)) - nativePromReadInstantHandler := wrapped(native.NewPromReadInstantHandler(nativeSourceOpts)) + promqlQueryHandler := prom.NewReadHandler(opts, nativeSourceOpts) + promqlInstantQueryHandler := prom.NewReadInstantHandler(opts, nativeSourceOpts) + nativePromReadHandler := native.NewPromReadHandler(nativeSourceOpts) + nativePromReadInstantHandler := native.NewPromReadInstantHandler(nativeSourceOpts) h.options.QueryRouter().Setup(options.QueryRouterOptions{ DefaultQueryEngine: h.options.DefaultQueryEngine(), @@ -195,71 +194,42 @@ func (h *Handler) RegisterRoutes() error { M3QueryHandler: nativePromReadInstantHandler.ServeHTTP, }) - h.router. - HandleFunc(native.PromReadURL, h.options.QueryRouter().ServeHTTP). - Methods(native.PromReadHTTPMethods...) - h.router. - HandleFunc(native.PromReadInstantURL, h.options.InstantQueryRouter().ServeHTTP). - Methods(native.PromReadInstantHTTPMethods...) - - h.router.HandleFunc("/prometheus"+native.PromReadURL, promqlQueryHandler.ServeHTTP).Methods(native.PromReadHTTPMethods...) - h.router.HandleFunc("/prometheus"+native.PromReadInstantURL, promqlInstantQueryHandler.ServeHTTP).Methods(native.PromReadInstantHTTPMethods...) - - h.router.HandleFunc(remote.PromReadURL, - wrapped(promRemoteReadHandler).ServeHTTP, - ).Methods(remote.PromReadHTTPMethods...) - h.router.HandleFunc(remote.PromWriteURL, - panicOnly(promRemoteWriteHandler).ServeHTTP, - ).Methods(remote.PromWriteHTTPMethod) - h.router.HandleFunc("/m3query"+native.PromReadURL, nativePromReadHandler.ServeHTTP).Methods(native.PromReadHTTPMethods...) - h.router.HandleFunc("/m3query"+native.PromReadInstantURL, nativePromReadInstantHandler.ServeHTTP).Methods(native.PromReadInstantHTTPMethods...) + addWrappedRoute(native.PromReadURL, h.options.QueryRouter(), native.PromReadHTTPMethods...) + addWrappedRoute(native.PromReadInstantURL, h.options.InstantQueryRouter(), native.PromReadInstantHTTPMethods...) + + addWrappedRoute("/prometheus"+native.PromReadURL, promqlQueryHandler, native.PromReadHTTPMethods...) + addWrappedRoute("/prometheus"+native.PromReadInstantURL, promqlInstantQueryHandler, native.PromReadInstantHTTPMethods...) + + addWrappedRoute(remote.PromReadURL, promRemoteReadHandler, remote.PromReadHTTPMethods...) + addRoute(remote.PromWriteURL, panicOnly(promRemoteWriteHandler), remote.PromWriteHTTPMethod) + + addWrappedRoute("/m3query"+native.PromReadURL, nativePromReadHandler, native.PromReadHTTPMethods...) + addWrappedRoute("/m3query"+native.PromReadInstantURL, nativePromReadInstantHandler, native.PromReadInstantHTTPMethods...) // InfluxDB write endpoint. - h.router.HandleFunc(influxdb.InfluxWriteURL, - wrapped(influxdb.NewInfluxWriterHandler(h.options)).ServeHTTP).Methods(influxdb.InfluxWriteHTTPMethod) + addWrappedRoute(influxdb.InfluxWriteURL, influxdb.NewInfluxWriterHandler(h.options), influxdb.InfluxWriteHTTPMethod) // Native M3 search and write endpoints. - h.router.HandleFunc(handler.SearchURL, - wrapped(handler.NewSearchHandler(h.options)).ServeHTTP, - ).Methods(handler.SearchHTTPMethod) - h.router.HandleFunc(m3json.WriteJSONURL, - wrapped(m3json.NewWriteJSONHandler(h.options)).ServeHTTP, - ).Methods(m3json.JSONWriteHTTPMethod) + addWrappedRoute(handler.SearchURL, handler.NewSearchHandler(h.options), handler.SearchHTTPMethod) + addWrappedRoute(m3json.WriteJSONURL, m3json.NewWriteJSONHandler(h.options), m3json.JSONWriteHTTPMethod) // Tag completion endpoints. - h.router.HandleFunc(native.CompleteTagsURL, - wrapped(native.NewCompleteTagsHandler(h.options)).ServeHTTP, - ).Methods(native.CompleteTagsHTTPMethod) - h.router.HandleFunc(remote.TagValuesURL, - wrapped(remote.NewTagValuesHandler(h.options)).ServeHTTP, - ).Methods(remote.TagValuesHTTPMethod) + addWrappedRoute(native.CompleteTagsURL, native.NewCompleteTagsHandler(h.options), native.CompleteTagsHTTPMethod) + addWrappedRoute(remote.TagValuesURL, remote.NewTagValuesHandler(h.options), remote.TagValuesHTTPMethod) // List tag endpoints. - h.router.HandleFunc(native.ListTagsURL, - wrapped(native.NewListTagsHandler(h.options)).ServeHTTP, - ).Methods(native.ListTagsHTTPMethods...) + addWrappedRoute(native.ListTagsURL, native.NewListTagsHandler(h.options), native.ListTagsHTTPMethods...) // Query parse endpoints. - h.router.HandleFunc(native.PromParseURL, - wrapped(native.NewPromParseHandler(h.options)).ServeHTTP, - ).Methods(native.PromParseHTTPMethod) - h.router.HandleFunc(native.PromThresholdURL, - wrapped(native.NewPromThresholdHandler(h.options)).ServeHTTP, - ).Methods(native.PromThresholdHTTPMethod) + addWrappedRoute(native.PromParseURL, native.NewPromParseHandler(h.options), native.PromParseHTTPMethod) + addWrappedRoute(native.PromThresholdURL, native.NewPromThresholdHandler(h.options), native.PromThresholdHTTPMethod) // Series match endpoints. - h.router.HandleFunc(remote.PromSeriesMatchURL, - wrapped(remote.NewPromSeriesMatchHandler(h.options)).ServeHTTP, - ).Methods(remote.PromSeriesMatchHTTPMethods...) + addWrappedRoute(remote.PromSeriesMatchURL, remote.NewPromSeriesMatchHandler(h.options), remote.PromSeriesMatchHTTPMethods...) // Graphite endpoints. - h.router.HandleFunc(graphite.ReadURL, - wrapped(graphite.NewRenderHandler(h.options)).ServeHTTP, - ).Methods(graphite.ReadHTTPMethods...) - - h.router.HandleFunc(graphite.FindURL, - wrapped(graphite.NewFindHandler(h.options)).ServeHTTP, - ).Methods(graphite.FindHTTPMethods...) + addWrappedRoute(graphite.ReadURL, graphite.NewRenderHandler(h.options), graphite.ReadHTTPMethods...) + addWrappedRoute(graphite.FindURL, graphite.NewFindHandler(h.options), graphite.FindHTTPMethods...) placementOpts, err := h.placementOpts() if err != nil { @@ -293,21 +263,18 @@ func (h *Handler) RegisterRoutes() error { } // Register debug dump handler. - h.router.HandleFunc(xdebug.DebugURL, - wrapped(debugWriter.HTTPHandler()).ServeHTTP) + addWrappedRoute(xdebug.DebugURL, debugWriter.HTTPHandler()) if clusterClient != nil { - err = database.RegisterRoutes(h.router, clusterClient, + err = database.RegisterRoutes(addWrappedRoute, clusterClient, h.options.Config(), h.options.EmbeddedDbCfg(), serviceOptionDefaults, instrumentOpts) if err != nil { return err } - - placement.RegisterRoutes(h.router, - serviceOptionDefaults, placementOpts) - namespace.RegisterRoutes(h.router, clusterClient, h.options.Clusters(), serviceOptionDefaults, instrumentOpts) - topic.RegisterRoutes(h.router, clusterClient, config, instrumentOpts) + placement.RegisterRoutes(addRoute, serviceOptionDefaults, placementOpts) + namespace.RegisterRoutes(addWrappedRoute, clusterClient, h.options.Clusters(), serviceOptionDefaults, instrumentOpts) + topic.RegisterRoutes(addWrappedRoute, clusterClient, config, instrumentOpts) // Experimental endpoints. if config.Experimental.Enabled { @@ -318,9 +285,7 @@ func (h *Handler) RegisterRoutes() error { Tagged(remoteSource). Tagged(experimentalAPIGroup), ) - h.router.HandleFunc(annotated.WriteURL, - wrapped(experimentalAnnotatedWriteHandler).ServeHTTP, - ).Methods(annotated.WriteHTTPMethod) + addWrappedRoute(annotated.WriteURL, experimentalAnnotatedWriteHandler, annotated.WriteHTTPMethod) } } @@ -328,9 +293,48 @@ func (h *Handler) RegisterRoutes() error { h.registerProfileEndpoints() h.registerRoutesEndpoint() + // Register custom endpoints. + for _, custom := range h.customHandlers { + route := h.router.Get(custom.Route()) + var prevHandler http.Handler + if route != nil { + prevHandler = route.GetHandler() + } + customHandler, err := custom.Handler(nativeSourceOpts, prevHandler) + if err != nil { + return err + } + + if route == nil { + addWrappedRoute(custom.Route(), customHandler, custom.Methods()...) + } else { + route.Handler(wrapped(customHandler)) + } + } + return nil } +func addRoute(router *mux.Router, path string, handler http.Handler, methods ...string) *mux.Route { + return addRouteHandlerFn(router, path, handler.ServeHTTP, methods...) +} + +func addRouteHandlerFn(router *mux.Router, path string, handlerFn func(http.ResponseWriter, *http.Request), methods ...string) *mux.Route { + if existingRoute := router.Get(path); existingRoute != nil { + return existingRoute + } + + route := router. + HandleFunc(path, handlerFn). + Name(path) + + if len(methods) > 0 { + return route.Methods(methods...) + } + + return route +} + func (h *Handler) placementOpts() (placement.HandlerOptions, error) { return placement.NewHandlerOptions( h.options.ClusterClient(), @@ -365,23 +369,23 @@ func (h *Handler) m3AggServiceOptions() *handleroptions.M3AggServiceOptions { // Endpoints useful for profiling the service. func (h *Handler) registerHealthEndpoints() { - h.router.HandleFunc(healthURL, func(w http.ResponseWriter, r *http.Request) { + addRouteHandlerFn(h.router, healthURL, func(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(struct { Uptime string `json:"uptime"` }{ Uptime: time.Since(h.options.CreatedAt()).String(), }) - }).Methods(http.MethodGet) + }, http.MethodGet) } // Endpoints useful for profiling the service. func (h *Handler) registerProfileEndpoints() { - h.router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) + h.router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux).Name("/debug/pprof/") } // Endpoints useful for viewing routes directory. func (h *Handler) registerRoutesEndpoint() { - h.router.HandleFunc(routesURL, func(w http.ResponseWriter, r *http.Request) { + addRouteHandlerFn(h.router, routesURL, func(w http.ResponseWriter, r *http.Request) { var routes []string err := h.router.Walk( func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { @@ -401,5 +405,5 @@ func (h *Handler) registerRoutesEndpoint() { }{ Routes: routes, }) - }).Methods(http.MethodGet) + }, http.MethodGet) } diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index f0e64b3c65..374b915709 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -385,9 +385,34 @@ func (h *customHandler) Route() string { return "/custom" } func (h *customHandler) Methods() []string { return []string{http.MethodGet} } func (h *customHandler) Handler( opts options.HandlerOptions, + prev http.Handler, ) (http.Handler, error) { assert.Equal(h.t, "z", string(opts.TagOptions().MetricName())) fn := func(w http.ResponseWriter, r *http.Request) { + if prev != nil { + assert.Fail(h.t, "Should not shadow already existing handler") + } + w.Write([]byte("success!")) + } + + return http.HandlerFunc(fn), nil +} + +type customHandlerOverride struct { + t *testing.T +} + +func (h *customHandlerOverride) Route() string { return "/custom" } +func (h *customHandlerOverride) Methods() []string { return []string{http.MethodGet} } +func (h *customHandlerOverride) Handler( + opts options.HandlerOptions, + prev http.Handler, +) (http.Handler, error) { + assert.Equal(h.t, "z", string(opts.TagOptions().MetricName())) + fn := func(w http.ResponseWriter, r *http.Request) { + if prev == nil { + assert.Fail(h.t, "Should shadow already existing handler") + } w.Write([]byte("success!")) } @@ -412,7 +437,8 @@ func TestCustomRoutes(t *testing.T) { require.NoError(t, err) custom := &customHandler{t: t} - handler := NewHandler(opts, custom) + custom2 := &customHandlerOverride{t: t} + handler := NewHandler(opts, custom, custom2) require.NoError(t, err, "unable to setup handler") err = handler.RegisterRoutes() require.NoError(t, err, "unable to register routes") diff --git a/src/query/api/v1/options/handler.go b/src/query/api/v1/options/handler.go index 15eab3f99f..0a5735cf19 100644 --- a/src/query/api/v1/options/handler.go +++ b/src/query/api/v1/options/handler.go @@ -70,8 +70,9 @@ type CustomHandler interface { Route() string // Methods is the list of http methods this handler services. Methods() []string - // Handler is the custom handler itself. - Handler(handlerOptions HandlerOptions) (http.Handler, error) + // Handler is the custom handler itself. prev is optional argument for getting already registered handler for the same route. + // If there is nothing to override, prev will be nil. + Handler(handlerOptions HandlerOptions, prev http.Handler) (http.Handler, error) } // QueryRouter is responsible for routing queries between promql and m3query.