diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 8c66ac4166..7eade83192 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" "github.com/m3db/m3/src/metrics/aggregation" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" @@ -112,6 +113,9 @@ type Configuration struct { // WriteWorkerPool is the worker pool policy for write requests. WriteWorkerPool xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"` + // WriteForwarding is the write forwarding options. + WriteForwarding WriteForwardingConfiguration `yaml:"writeForwarding"` + // Downsample configurates how the metrics should be downsampled. Downsample downsample.Configuration `yaml:"downsample"` @@ -140,6 +144,11 @@ type Configuration struct { DeprecatedCache CacheConfiguration `yaml:"cache"` } +// WriteForwardingConfiguration is the write forwarding configuration. +type WriteForwardingConfiguration struct { + PromRemoteWrite remote.PromWriteHandlerForwardingOptions `yaml:"promRemoteWrite"` +} + // Filter is a query filter type. type Filter string diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 756cda04e8..04df3901e1 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -57,30 +57,46 @@ type TimeoutOpts struct { FetchTimeout time.Duration } +// ParsePromCompressedRequestResult is the result of a +// ParsePromCompressedRequest call. +type ParsePromCompressedRequestResult struct { + CompressedBody []byte + UncompressedBody []byte +} + // ParsePromCompressedRequest parses a snappy compressed request from Prometheus. -func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { +func ParsePromCompressedRequest( + r *http.Request, +) (ParsePromCompressedRequestResult, *xhttp.ParseError) { body := r.Body if r.Body == nil { err := fmt.Errorf("empty request body") - return nil, xhttp.NewParseError(err, http.StatusBadRequest) + return ParsePromCompressedRequestResult{}, + xhttp.NewParseError(err, http.StatusBadRequest) } defer body.Close() compressed, err := ioutil.ReadAll(body) if err != nil { - return nil, xhttp.NewParseError(err, http.StatusInternalServerError) + return ParsePromCompressedRequestResult{}, + xhttp.NewParseError(err, http.StatusInternalServerError) } if len(compressed) == 0 { - return nil, xhttp.NewParseError(fmt.Errorf("empty request body"), http.StatusBadRequest) + return ParsePromCompressedRequestResult{}, + xhttp.NewParseError(fmt.Errorf("empty request body"), http.StatusBadRequest) } reqBuf, err := snappy.Decode(nil, compressed) if err != nil { - return nil, xhttp.NewParseError(err, http.StatusBadRequest) + return ParsePromCompressedRequestResult{}, + xhttp.NewParseError(err, http.StatusBadRequest) } - return reqBuf, nil + return ParsePromCompressedRequestResult{ + CompressedBody: compressed, + UncompressedBody: reqBuf, + }, nil } // ParseRequestTimeout parses the input request timeout with a default. diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 2f491fe706..8c1aa2266e 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -159,13 +159,13 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *PromReadHandler) parseRequest( r *http.Request, ) (*prompb.ReadRequest, *xhttp.ParseError) { - reqBuf, err := prometheus.ParsePromCompressedRequest(r) + result, err := prometheus.ParsePromCompressedRequest(r) if err != nil { return nil, err } var req prompb.ReadRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { + if err := proto.Unmarshal(result.UncompressedBody, &req); err != nil { return nil, xhttp.NewParseError(err, http.StatusBadRequest) } diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 6111b50bec..650cc35cca 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -21,6 +21,7 @@ package remote import ( + "bytes" "context" "errors" "fmt" @@ -42,6 +43,7 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" + xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/protobuf/proto" @@ -58,6 +60,9 @@ const ( // emptyStoragePolicyVar for code readability. emptyStoragePolicyVar = "" + + // defaultForwardingTimeout is the default forwarding timeout. + defaultForwardingTimeout = 15 * time.Second ) var ( @@ -69,17 +74,39 @@ var ( // PromWriteHandler represents a handler for prometheus write endpoint. type PromWriteHandler struct { - downsamplerAndWriter ingest.DownsamplerAndWriter - tagOptions models.TagOptions - nowFn clock.NowFn - instrumentOpts instrument.Options - metrics promWriteMetrics + downsamplerAndWriter ingest.DownsamplerAndWriter + tagOptions models.TagOptions + forwarding PromWriteHandlerForwardingOptions + forwardTimeout time.Duration + forwardHTTPClient *http.Client + forwardingBoundWorkers xsync.WorkerPool + forwardContext context.Context + nowFn clock.NowFn + instrumentOpts instrument.Options + metrics promWriteMetrics +} + +// PromWriteHandlerForwardingOptions is the forwarding options for prometheus write handler. +type PromWriteHandlerForwardingOptions struct { + // MaxConcurrency is the max parallel forwarding and if zero will be unlimited. + MaxConcurrency int `yaml:"maxConcurrency"` + Timeout time.Duration `yaml:"timeout"` + Targets []PromWriteHandlerForwardTargetOptions `yaml:"targets"` +} + +// PromWriteHandlerForwardTargetOptions is a prometheus write handler forwarder target. +type PromWriteHandlerForwardTargetOptions struct { + // URL of the target to send to. + URL string `yaml:"url"` + // Method defaults to POST if not set. + Method string `yaml:"method"` } // NewPromWriteHandler returns a new instance of handler. func NewPromWriteHandler( downsamplerAndWriter ingest.DownsamplerAndWriter, tagOptions models.TagOptions, + forwarding PromWriteHandlerForwardingOptions, nowFn clock.NowFn, instrumentOpts instrument.Options, ) (http.Handler, error) { @@ -98,12 +125,34 @@ func NewPromWriteHandler( return nil, err } + // Only use a forwarding worker pool if concurrency is bound, otherwise + // if unlimited we just spin up a goroutine for each incoming write. + var forwardingBoundWorkers xsync.WorkerPool + if v := forwarding.MaxConcurrency; v > 0 { + forwardingBoundWorkers = xsync.NewWorkerPool(v) + forwardingBoundWorkers.Init() + } + + forwardTimeout := defaultForwardingTimeout + if v := forwarding.Timeout; v > 0 { + forwardTimeout = v + } + + forwardHTTPOpts := xhttp.DefaultHTTPClientOptions() + forwardHTTPOpts.DisableCompression = true // Already snappy compressed. + forwardHTTPOpts.RequestTimeout = forwardTimeout + return &PromWriteHandler{ - downsamplerAndWriter: downsamplerAndWriter, - tagOptions: tagOptions, - nowFn: nowFn, - metrics: metrics, - instrumentOpts: instrumentOpts, + downsamplerAndWriter: downsamplerAndWriter, + tagOptions: tagOptions, + forwarding: forwarding, + forwardTimeout: forwardTimeout, + forwardHTTPClient: xhttp.NewHTTPClient(forwardHTTPOpts), + forwardingBoundWorkers: forwardingBoundWorkers, + forwardContext: context.Background(), + nowFn: nowFn, + metrics: metrics, + instrumentOpts: instrumentOpts, }, nil } @@ -113,6 +162,10 @@ type promWriteMetrics struct { writeErrorsClient tally.Counter ingestLatency tally.Histogram ingestLatencyBuckets tally.DurationBuckets + forwardSuccess tally.Counter + forwardErrors tally.Counter + forwardDropped tally.Counter + forwardLatency tally.Histogram } func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) { @@ -155,23 +208,68 @@ func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) { ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...) ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...) ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...) + + var forwardLatencyBuckets tally.DurationBuckets + forwardLatencyBuckets = append(forwardLatencyBuckets, upTo1sBuckets...) + forwardLatencyBuckets = append(forwardLatencyBuckets, upTo10sBuckets...) + forwardLatencyBuckets = append(forwardLatencyBuckets, upTo60sBuckets...) + forwardLatencyBuckets = append(forwardLatencyBuckets, upTo60mBuckets...) return promWriteMetrics{ writeSuccess: scope.SubScope("write").Counter("success"), writeErrorsServer: scope.SubScope("write").Tagged(map[string]string{"code": "5XX"}).Counter("errors"), writeErrorsClient: scope.SubScope("write").Tagged(map[string]string{"code": "4XX"}).Counter("errors"), ingestLatency: scope.SubScope("ingest").Histogram("latency", ingestLatencyBuckets), ingestLatencyBuckets: ingestLatencyBuckets, + forwardSuccess: scope.SubScope("forward").Counter("success"), + forwardErrors: scope.SubScope("forward").Counter("errors"), + forwardDropped: scope.SubScope("forward").Counter("dropped"), + forwardLatency: scope.SubScope("forward").Histogram("latency", forwardLatencyBuckets), }, nil } func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - req, opts, rErr := h.parseRequest(r) + req, opts, result, rErr := h.parseRequest(r) if rErr != nil { h.metrics.writeErrorsClient.Inc(1) xhttp.Error(w, rErr.Inner(), rErr.Code()) return } + // Begin async forwarding. + // NB(r): Be careful about not returning buffers to pool + // if the request bodies ever get pooled until after + // forwarding completes. + if targets := h.forwarding.Targets; len(targets) > 0 { + for _, target := range targets { + target := target // Capture for lambda. + forward := func() { + // Consider propgating baggage without tying + // context to request context in future. + ctx, cancel := context.WithTimeout(h.forwardContext, h.forwardTimeout) + defer cancel() + + if err := h.forward(ctx, result, target); err != nil { + h.metrics.forwardErrors.Inc(1) + logger := logging.WithContext(h.forwardContext, h.instrumentOpts) + logger.Error("forward error", zap.Error(err)) + return + } + h.metrics.forwardSuccess.Inc(1) + } + + spawned := false + if h.forwarding.MaxConcurrency > 0 { + spawned = h.forwardingBoundWorkers.GoIfAvailable(forward) + } else { + go forward() + spawned = true + } + if !spawned { + h.metrics.forwardDropped.Inc(1) + } + } + } + batchErr := h.write(r.Context(), req, opts) // Record ingestion delay latency @@ -246,7 +344,7 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *PromWriteHandler) parseRequest( r *http.Request, -) (*prompb.WriteRequest, ingest.WriteOptions, *xhttp.ParseError) { +) (*prompb.WriteRequest, ingest.WriteOptions, prometheus.ParsePromCompressedRequestResult, *xhttp.ParseError) { var opts ingest.WriteOptions if v := strings.TrimSpace(r.Header.Get(handler.MetricsTypeHeader)); v != "" { // Allow the metrics type and storage policies to override @@ -254,6 +352,7 @@ func (h *PromWriteHandler) parseRequest( metricsType, err := storage.ParseMetricsType(v) if err != nil { return nil, ingest.WriteOptions{}, + prometheus.ParsePromCompressedRequestResult{}, xhttp.NewParseError(err, http.StatusBadRequest) } @@ -269,6 +368,7 @@ func (h *PromWriteHandler) parseRequest( if strPolicy != emptyStoragePolicyVar { err := errUnaggregatedStoragePolicySet return nil, ingest.WriteOptions{}, + prometheus.ParsePromCompressedRequestResult{}, xhttp.NewParseError(err, http.StatusBadRequest) } default: @@ -276,6 +376,7 @@ func (h *PromWriteHandler) parseRequest( if err != nil { err = fmt.Errorf("could not parse storage policy: %v", err) return nil, ingest.WriteOptions{}, + prometheus.ParsePromCompressedRequestResult{}, xhttp.NewParseError(err, http.StatusBadRequest) } @@ -287,18 +388,20 @@ func (h *PromWriteHandler) parseRequest( } } - reqBuf, err := prometheus.ParsePromCompressedRequest(r) + result, err := prometheus.ParsePromCompressedRequest(r) if err != nil { - return nil, ingest.WriteOptions{}, err + return nil, ingest.WriteOptions{}, + prometheus.ParsePromCompressedRequestResult{}, err } var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { + if err := proto.Unmarshal(result.UncompressedBody, &req); err != nil { return nil, ingest.WriteOptions{}, + prometheus.ParsePromCompressedRequestResult{}, xhttp.NewParseError(err, http.StatusBadRequest) } - return &req, opts, nil + return &req, opts, result, nil } func (h *PromWriteHandler) write( @@ -310,6 +413,32 @@ func (h *PromWriteHandler) write( return h.downsamplerAndWriter.WriteBatch(ctx, iter, opts) } +func (h *PromWriteHandler) forward( + ctx context.Context, + request prometheus.ParsePromCompressedRequestResult, + target PromWriteHandlerForwardTargetOptions, +) error { + method := target.Method + if method == "" { + method = http.MethodPost + } + + req, err := http.NewRequest(target.Method, target.URL, + bytes.NewReader(request.CompressedBody)) + if err != nil { + return err + } + + resp, err := h.forwardHTTPClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + if resp.StatusCode/100 != 2 { + return fmt.Errorf("expected status code 2XX: actual=%v", resp.StatusCode) + } + return nil +} + func newPromTSIter(timeseries []*prompb.TimeSeries, tagOpts models.TagOptions) *promTSIter { // Construct the tags and datapoints upfront so that if the iterator // is reset, we don't have to generate them twice. diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index 4e4f0b9a49..888954b598 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -52,14 +52,15 @@ func TestPromWriteParsing(t *testing.T) { mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) handler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, instrument.NewOptions()) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, instrument.NewOptions()) require.NoError(t, err) promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) - r, opts, err := handler.(*PromWriteHandler).parseRequest(req) + r, opts, _, err := handler.(*PromWriteHandler).parseRequest(req) require.Nil(t, err, "unable to parse request") require.Equal(t, len(r.Timeseries), 2) require.Equal(t, ingest.WriteOptions{}, opts) @@ -75,7 +76,8 @@ func TestPromWrite(t *testing.T) { WriteBatch(gomock.Any(), gomock.Any(), gomock.Any()) handler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, instrument.NewOptions()) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, instrument.NewOptions()) require.NoError(t, err) promReq := test.GeneratePromWriteRequest() @@ -101,7 +103,8 @@ func TestPromWriteError(t *testing.T) { Return(batchErr) handler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, instrument.NewOptions()) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, instrument.NewOptions()) require.NoError(t, err) promReq := test.GeneratePromWriteRequest() @@ -132,7 +135,8 @@ func TestWriteErrorMetricCount(t *testing.T) { SetMetricsScope(scope) handler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, iopts) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, iopts) require.NoError(t, err) req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, nil) @@ -158,7 +162,8 @@ func TestWriteDatapointDelayMetric(t *testing.T) { map[string]string{"test": "delay-metric-test"}) handler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, instrument.NewOptions().SetMetricsScope(scope)) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, instrument.NewOptions().SetMetricsScope(scope)) require.NoError(t, err) writeHandler, ok := handler.(*PromWriteHandler) @@ -219,7 +224,8 @@ func TestPromWriteUnaggregatedMetricsWithHeader(t *testing.T) { WriteBatch(gomock.Any(), gomock.Any(), expectedIngestWriteOptions) writeHandler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, instrument.NewOptions()) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, instrument.NewOptions()) require.NoError(t, err) promReq := test.GeneratePromWriteRequest() @@ -253,7 +259,8 @@ func TestPromWriteAggregatedMetricsWithHeader(t *testing.T) { WriteBatch(gomock.Any(), gomock.Any(), expectedIngestWriteOptions) writeHandler, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), time.Now, instrument.NewOptions()) + models.NewTagOptions(), PromWriteHandlerForwardingOptions{}, + time.Now, instrument.NewOptions()) require.NoError(t, err) promReq := test.GeneratePromWriteRequest() diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index f9218c78ff..fe2275c643 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -195,7 +195,7 @@ func (h *Handler) RegisterRoutes() error { promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.fetchOptionsBuilder, h.timeoutOpts, keepNans, remoteSourceInstrumentOpts) promRemoteWriteHandler, err := remote.NewPromWriteHandler(h.downsamplerAndWriter, - h.tagOptions, nowFn, remoteSourceInstrumentOpts) + h.tagOptions, h.config.WriteForwarding.PromRemoteWrite, nowFn, remoteSourceInstrumentOpts) if err != nil { return err } diff --git a/src/x/net/http/client.go b/src/x/net/http/client.go new file mode 100644 index 0000000000..246aad0e92 --- /dev/null +++ b/src/x/net/http/client.go @@ -0,0 +1,71 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xhttp + +import ( + "net" + "net/http" + "time" +) + +// HTTPClientOptions specify HTTP Client options. +type HTTPClientOptions struct { + RequestTimeout time.Duration `yaml:"requestTimeout"` + ConnectTimeout time.Duration `yaml:"connectTimeout"` + KeepAlive time.Duration `yaml:"keepAlive"` + IdleConnTimeout time.Duration `yaml:"idleConnTimeout"` + MaxIdleConns int `yaml:"maxIdleConns"` + DisableCompression bool `yaml:"disableCompression"` +} + +// NewHTTPClient constructs a new HTTP Client. +func NewHTTPClient(o HTTPClientOptions) *http.Client { + return &http.Client{ + Timeout: o.RequestTimeout, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: o.ConnectTimeout, + KeepAlive: o.KeepAlive, + DualStack: true, + }).Dial, + TLSHandshakeTimeout: o.ConnectTimeout, + ExpectContinueTimeout: o.ConnectTimeout, + MaxIdleConns: o.MaxIdleConns, + MaxIdleConnsPerHost: o.MaxIdleConns, + DisableCompression: o.DisableCompression, + }, + } +} + +// DefaultHTTPClientOptions returns default options. +func DefaultHTTPClientOptions() HTTPClientOptions { + return HTTPClientOptions{ + RequestTimeout: 60 * time.Second, + ConnectTimeout: 5 * time.Second, + KeepAlive: 60 * time.Second, + IdleConnTimeout: 60 * time.Second, + MaxIdleConns: 100, + // DisableCompression is true by default since we have seen + // a large amount of overhead with compression. + DisableCompression: true, + } +}