Skip to content

Commit

Permalink
[coordinator] Add Prometheus write request forwarding for basic repli…
Browse files Browse the repository at this point in the history
…cation (#1922)
  • Loading branch information
robskillington authored Sep 11, 2019
1 parent db4dc03 commit fea1d79
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 33 deletions.
9 changes: 9 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
Expand Down Expand Up @@ -112,6 +113,9 @@ type Configuration struct {
// WriteWorkerPool is the worker pool policy for write requests.
WriteWorkerPool xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"`

// WriteForwarding is the write forwarding options.
WriteForwarding WriteForwardingConfiguration `yaml:"writeForwarding"`

// Downsample configurates how the metrics should be downsampled.
Downsample downsample.Configuration `yaml:"downsample"`

Expand Down Expand Up @@ -140,6 +144,11 @@ type Configuration struct {
DeprecatedCache CacheConfiguration `yaml:"cache"`
}

// WriteForwardingConfiguration is the write forwarding configuration.
type WriteForwardingConfiguration struct {
PromRemoteWrite remote.PromWriteHandlerForwardingOptions `yaml:"promRemoteWrite"`
}

// Filter is a query filter type.
type Filter string

Expand Down
28 changes: 22 additions & 6 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,46 @@ type TimeoutOpts struct {
FetchTimeout time.Duration
}

// ParsePromCompressedRequestResult is the result of a
// ParsePromCompressedRequest call.
type ParsePromCompressedRequestResult struct {
CompressedBody []byte
UncompressedBody []byte
}

// ParsePromCompressedRequest parses a snappy compressed request from Prometheus.
func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) {
func ParsePromCompressedRequest(
r *http.Request,
) (ParsePromCompressedRequestResult, *xhttp.ParseError) {
body := r.Body
if r.Body == nil {
err := fmt.Errorf("empty request body")
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
return ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusBadRequest)
}
defer body.Close()
compressed, err := ioutil.ReadAll(body)

if err != nil {
return nil, xhttp.NewParseError(err, http.StatusInternalServerError)
return ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusInternalServerError)
}

if len(compressed) == 0 {
return nil, xhttp.NewParseError(fmt.Errorf("empty request body"), http.StatusBadRequest)
return ParsePromCompressedRequestResult{},
xhttp.NewParseError(fmt.Errorf("empty request body"), http.StatusBadRequest)
}

reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
return ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusBadRequest)
}

return reqBuf, nil
return ParsePromCompressedRequestResult{
CompressedBody: compressed,
UncompressedBody: reqBuf,
}, nil
}

// ParseRequestTimeout parses the input request timeout with a default.
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (h *PromReadHandler) parseRequest(
r *http.Request,
) (*prompb.ReadRequest, *xhttp.ParseError) {
reqBuf, err := prometheus.ParsePromCompressedRequest(r)
result, err := prometheus.ParsePromCompressedRequest(r)
if err != nil {
return nil, err
}

var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
if err := proto.Unmarshal(result.UncompressedBody, &req); err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

Expand Down
161 changes: 145 additions & 16 deletions src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package remote

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -42,6 +43,7 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/protobuf/proto"
Expand All @@ -58,6 +60,9 @@ const (

// emptyStoragePolicyVar for code readability.
emptyStoragePolicyVar = ""

// defaultForwardingTimeout is the default forwarding timeout.
defaultForwardingTimeout = 15 * time.Second
)

var (
Expand All @@ -69,17 +74,39 @@ var (

// PromWriteHandler represents a handler for prometheus write endpoint.
type PromWriteHandler struct {
downsamplerAndWriter ingest.DownsamplerAndWriter
tagOptions models.TagOptions
nowFn clock.NowFn
instrumentOpts instrument.Options
metrics promWriteMetrics
downsamplerAndWriter ingest.DownsamplerAndWriter
tagOptions models.TagOptions
forwarding PromWriteHandlerForwardingOptions
forwardTimeout time.Duration
forwardHTTPClient *http.Client
forwardingBoundWorkers xsync.WorkerPool
forwardContext context.Context
nowFn clock.NowFn
instrumentOpts instrument.Options
metrics promWriteMetrics
}

// PromWriteHandlerForwardingOptions is the forwarding options for prometheus write handler.
type PromWriteHandlerForwardingOptions struct {
// MaxConcurrency is the max parallel forwarding and if zero will be unlimited.
MaxConcurrency int `yaml:"maxConcurrency"`
Timeout time.Duration `yaml:"timeout"`
Targets []PromWriteHandlerForwardTargetOptions `yaml:"targets"`
}

// PromWriteHandlerForwardTargetOptions is a prometheus write handler forwarder target.
type PromWriteHandlerForwardTargetOptions struct {
// URL of the target to send to.
URL string `yaml:"url"`
// Method defaults to POST if not set.
Method string `yaml:"method"`
}

// NewPromWriteHandler returns a new instance of handler.
func NewPromWriteHandler(
downsamplerAndWriter ingest.DownsamplerAndWriter,
tagOptions models.TagOptions,
forwarding PromWriteHandlerForwardingOptions,
nowFn clock.NowFn,
instrumentOpts instrument.Options,
) (http.Handler, error) {
Expand All @@ -98,12 +125,34 @@ func NewPromWriteHandler(
return nil, err
}

// Only use a forwarding worker pool if concurrency is bound, otherwise
// if unlimited we just spin up a goroutine for each incoming write.
var forwardingBoundWorkers xsync.WorkerPool
if v := forwarding.MaxConcurrency; v > 0 {
forwardingBoundWorkers = xsync.NewWorkerPool(v)
forwardingBoundWorkers.Init()
}

forwardTimeout := defaultForwardingTimeout
if v := forwarding.Timeout; v > 0 {
forwardTimeout = v
}

forwardHTTPOpts := xhttp.DefaultHTTPClientOptions()
forwardHTTPOpts.DisableCompression = true // Already snappy compressed.
forwardHTTPOpts.RequestTimeout = forwardTimeout

return &PromWriteHandler{
downsamplerAndWriter: downsamplerAndWriter,
tagOptions: tagOptions,
nowFn: nowFn,
metrics: metrics,
instrumentOpts: instrumentOpts,
downsamplerAndWriter: downsamplerAndWriter,
tagOptions: tagOptions,
forwarding: forwarding,
forwardTimeout: forwardTimeout,
forwardHTTPClient: xhttp.NewHTTPClient(forwardHTTPOpts),
forwardingBoundWorkers: forwardingBoundWorkers,
forwardContext: context.Background(),
nowFn: nowFn,
metrics: metrics,
instrumentOpts: instrumentOpts,
}, nil
}

Expand All @@ -113,6 +162,10 @@ type promWriteMetrics struct {
writeErrorsClient tally.Counter
ingestLatency tally.Histogram
ingestLatencyBuckets tally.DurationBuckets
forwardSuccess tally.Counter
forwardErrors tally.Counter
forwardDropped tally.Counter
forwardLatency tally.Histogram
}

func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) {
Expand Down Expand Up @@ -155,23 +208,68 @@ func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) {
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...)

var forwardLatencyBuckets tally.DurationBuckets
forwardLatencyBuckets = append(forwardLatencyBuckets, upTo1sBuckets...)
forwardLatencyBuckets = append(forwardLatencyBuckets, upTo10sBuckets...)
forwardLatencyBuckets = append(forwardLatencyBuckets, upTo60sBuckets...)
forwardLatencyBuckets = append(forwardLatencyBuckets, upTo60mBuckets...)
return promWriteMetrics{
writeSuccess: scope.SubScope("write").Counter("success"),
writeErrorsServer: scope.SubScope("write").Tagged(map[string]string{"code": "5XX"}).Counter("errors"),
writeErrorsClient: scope.SubScope("write").Tagged(map[string]string{"code": "4XX"}).Counter("errors"),
ingestLatency: scope.SubScope("ingest").Histogram("latency", ingestLatencyBuckets),
ingestLatencyBuckets: ingestLatencyBuckets,
forwardSuccess: scope.SubScope("forward").Counter("success"),
forwardErrors: scope.SubScope("forward").Counter("errors"),
forwardDropped: scope.SubScope("forward").Counter("dropped"),
forwardLatency: scope.SubScope("forward").Histogram("latency", forwardLatencyBuckets),
}, nil
}

func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, opts, rErr := h.parseRequest(r)
req, opts, result, rErr := h.parseRequest(r)
if rErr != nil {
h.metrics.writeErrorsClient.Inc(1)
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

// Begin async forwarding.
// NB(r): Be careful about not returning buffers to pool
// if the request bodies ever get pooled until after
// forwarding completes.
if targets := h.forwarding.Targets; len(targets) > 0 {
for _, target := range targets {
target := target // Capture for lambda.
forward := func() {
// Consider propgating baggage without tying
// context to request context in future.
ctx, cancel := context.WithTimeout(h.forwardContext, h.forwardTimeout)
defer cancel()

if err := h.forward(ctx, result, target); err != nil {
h.metrics.forwardErrors.Inc(1)
logger := logging.WithContext(h.forwardContext, h.instrumentOpts)
logger.Error("forward error", zap.Error(err))
return
}
h.metrics.forwardSuccess.Inc(1)
}

spawned := false
if h.forwarding.MaxConcurrency > 0 {
spawned = h.forwardingBoundWorkers.GoIfAvailable(forward)
} else {
go forward()
spawned = true
}
if !spawned {
h.metrics.forwardDropped.Inc(1)
}
}
}

batchErr := h.write(r.Context(), req, opts)

// Record ingestion delay latency
Expand Down Expand Up @@ -246,14 +344,15 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func (h *PromWriteHandler) parseRequest(
r *http.Request,
) (*prompb.WriteRequest, ingest.WriteOptions, *xhttp.ParseError) {
) (*prompb.WriteRequest, ingest.WriteOptions, prometheus.ParsePromCompressedRequestResult, *xhttp.ParseError) {
var opts ingest.WriteOptions
if v := strings.TrimSpace(r.Header.Get(handler.MetricsTypeHeader)); v != "" {
// Allow the metrics type and storage policies to override
// the default rules and policies if specified.
metricsType, err := storage.ParseMetricsType(v)
if err != nil {
return nil, ingest.WriteOptions{},
prometheus.ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusBadRequest)
}

Expand All @@ -269,13 +368,15 @@ func (h *PromWriteHandler) parseRequest(
if strPolicy != emptyStoragePolicyVar {
err := errUnaggregatedStoragePolicySet
return nil, ingest.WriteOptions{},
prometheus.ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusBadRequest)
}
default:
parsed, err := policy.ParseStoragePolicy(strPolicy)
if err != nil {
err = fmt.Errorf("could not parse storage policy: %v", err)
return nil, ingest.WriteOptions{},
prometheus.ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusBadRequest)
}

Expand All @@ -287,18 +388,20 @@ func (h *PromWriteHandler) parseRequest(
}
}

reqBuf, err := prometheus.ParsePromCompressedRequest(r)
result, err := prometheus.ParsePromCompressedRequest(r)
if err != nil {
return nil, ingest.WriteOptions{}, err
return nil, ingest.WriteOptions{},
prometheus.ParsePromCompressedRequestResult{}, err
}

var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
if err := proto.Unmarshal(result.UncompressedBody, &req); err != nil {
return nil, ingest.WriteOptions{},
prometheus.ParsePromCompressedRequestResult{},
xhttp.NewParseError(err, http.StatusBadRequest)
}

return &req, opts, nil
return &req, opts, result, nil
}

func (h *PromWriteHandler) write(
Expand All @@ -310,6 +413,32 @@ func (h *PromWriteHandler) write(
return h.downsamplerAndWriter.WriteBatch(ctx, iter, opts)
}

func (h *PromWriteHandler) forward(
ctx context.Context,
request prometheus.ParsePromCompressedRequestResult,
target PromWriteHandlerForwardTargetOptions,
) error {
method := target.Method
if method == "" {
method = http.MethodPost
}

req, err := http.NewRequest(target.Method, target.URL,
bytes.NewReader(request.CompressedBody))
if err != nil {
return err
}

resp, err := h.forwardHTTPClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
if resp.StatusCode/100 != 2 {
return fmt.Errorf("expected status code 2XX: actual=%v", resp.StatusCode)
}
return nil
}

func newPromTSIter(timeseries []*prompb.TimeSeries, tagOpts models.TagOptions) *promTSIter {
// Construct the tags and datapoints upfront so that if the iterator
// is reset, we don't have to generate them twice.
Expand Down
Loading

0 comments on commit fea1d79

Please sign in to comment.