Skip to content

Commit

Permalink
[coordinator] Forward all relevant M3 headers to forwarding targets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
schallert authored Apr 8, 2020
1 parent d4f4931 commit 2742736
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
22 changes: 13 additions & 9 deletions src/query/api/v1/handler/prometheus/handleroptions/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,41 @@ import (
)

const (
// M3HeaderPrefix is the prefix all M3-specific headers that affect query or
// write behavior (not necessarily m3admin headers) are guaranteed to have.
M3HeaderPrefix = "M3-"

// WarningsHeader is the M3 warnings header when to display a warning to a user.
WarningsHeader = "M3-Warnings"
WarningsHeader = M3HeaderPrefix + "Warnings"

// RetryHeader is the M3 retry header to display when it is safe to retry.
RetryHeader = "M3-Retry"
RetryHeader = M3HeaderPrefix + "Retry"

// ServedByHeader is the M3 query storage execution breakdown.
ServedByHeader = "M3-Storage-By"
ServedByHeader = M3HeaderPrefix + "Storage-By"

// DeprecatedHeader is the M3 deprecated header.
DeprecatedHeader = "M3-Deprecated"
DeprecatedHeader = M3HeaderPrefix + "Deprecated"

// MetricsTypeHeader sets the write or read metrics type to restrict
// metrics to.
// Valid values are "unaggregated" or "aggregated".
MetricsTypeHeader = "M3-Metrics-Type"
MetricsTypeHeader = M3HeaderPrefix + "Metrics-Type"

// MetricsStoragePolicyHeader specifies the resolution and retention of
// metrics being written or read.
// In the form of a storage policy string, e.g. "1m:14d".
// Only required if the metrics type header does not specify unaggregated
// metrics type.
MetricsStoragePolicyHeader = "M3-Storage-Policy"
MetricsStoragePolicyHeader = M3HeaderPrefix + "Storage-Policy"

// RestrictByTagsJSONHeader provides tag options to enforces on queries,
// in JSON format. See `handler.stringTagOptions` for definitions.`
RestrictByTagsJSONHeader = "M3-Restrict-By-Tags-JSON"
RestrictByTagsJSONHeader = M3HeaderPrefix + "Restrict-By-Tags-JSON"

// LimitMaxSeriesHeader is the M3 limit timeseries header that limits
// the number of time series returned by each storage node.
LimitMaxSeriesHeader = "M3-Limit-Max-Series"
LimitMaxSeriesHeader = M3HeaderPrefix + "Limit-Max-Series"

// UnaggregatedStoragePolicy specifies the unaggregated storage policy.
UnaggregatedStoragePolicy = "unaggregated"
Expand All @@ -80,7 +84,7 @@ const (
HeaderForce = "Force"

// LimitHeader is the header added when returned series are limited.
LimitHeader = "M3-Results-Limited"
LimitHeader = M3HeaderPrefix + "Results-Limited"

// LimitHeaderSeriesLimitApplied is the header applied when fetch results are
// maxed.
Expand Down
19 changes: 16 additions & 3 deletions src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(h.forwardContext, h.forwardTimeout)
defer cancel()

if err := h.forward(ctx, result, target); err != nil {
if err := h.forward(ctx, result, r.Header, target); err != nil {
h.metrics.forwardErrors.Inc(1)
logger := logging.WithContext(h.forwardContext, h.instrumentOpts)
logger.Error("forward error", zap.Error(err))
Expand Down Expand Up @@ -413,6 +413,7 @@ func (h *PromWriteHandler) write(
func (h *PromWriteHandler) forward(
ctx context.Context,
request prometheus.ParsePromCompressedRequestResult,
headers http.Header,
target handleroptions.PromWriteHandlerForwardTargetOptions,
) error {
method := target.Method
Expand All @@ -425,9 +426,21 @@ func (h *PromWriteHandler) forward(
return err
}

if headers := target.Headers; headers != nil {
// There are multiple headers that impact coordinator behavior on the write
// (map tags, storage policy, etc.) that we must forward to the target
// coordinator to guarantee same behavior as the coordinator that originally
// received the request.
if headers != nil {
for h := range headers {
if strings.HasPrefix(h, handleroptions.M3HeaderPrefix) {
req.Header.Add(h, headers.Get(h))
}
}
}

if targetHeaders := target.Headers; targetHeaders != nil {
// If headers set, attach to request.
for name, value := range headers {
for name, value := range targetHeaders {
req.Header.Add(name, value)
}
}
Expand Down

0 comments on commit 2742736

Please sign in to comment.