Skip to content

Commit

Permalink
support forward-header
Browse files Browse the repository at this point in the history
Signed-off-by: lcasi <[email protected]>
  • Loading branch information
lcasi committed Mar 7, 2022
1 parent e60ca7e commit b762212
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

### Added

- [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier.

### Changed

- [#5205](https://github.com/thanos-io/thanos/pull/5205) Rule: Add ruler labels as external labels in stateless ruler mode.
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func registerQueryFrontend(app *extkingpin.App) {
"If multiple headers match the request, the first matching arg specified will take precedence. "+
"If no headers match 'anonymous' will be used.").PlaceHolder("<http-header-name>").StringsVar(&cfg.orgIdHeaders)

cmd.Flag("query-frontend.forward-header", "List of headers forwarded by the query Frontend to downstream querier, default is empty").PlaceHolder("<http-header-name>").StringsVar(&cfg.ForwardHeaders)

cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall : Logs the finish call of the requests. LogStartAndFinishCall : Logs the start and finish call of the requests. NoLogCall : Disable request logging.").Default("").EnumVar(&cfg.RequestLoggingDecision, "NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "")
reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

Expand Down
1 change: 1 addition & 0 deletions pkg/queryfrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type Config struct {
CacheCompression string
RequestLoggingDecision string
DownstreamURL string
ForwardHeaders []string
}

// QueryRangeConfig holds the config for query range tripperware.
Expand Down
41 changes: 36 additions & 5 deletions pkg/queryfrontend/labels_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange
}
}

func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (queryrange.Request, error) {
func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (queryrange.Request, error) {
if err := r.ParseForm(); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
Expand All @@ -118,9 +118,9 @@ func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request, _ []strin
)
switch op := getOperation(r); op {
case labelNamesOp, labelValuesOp:
req, err = c.parseLabelsRequest(r, op)
req, err = c.parseLabelsRequest(r, op, forwardHeaders)
case seriesOp:
req, err = c.parseSeriesRequest(r)
req, err = c.parseSeriesRequest(r, forwardHeaders)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -167,6 +167,12 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}

for _, hv := range thanosReq.Headers {
for _, v := range hv.Values {
req.Header.Add(hv.Name, v)
}
}

case *ThanosSeriesRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
Expand All @@ -187,6 +193,11 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
for _, hv := range thanosReq.Headers {
for _, v := range hv.Values {
req.Header.Add(hv.Name, v)
}
}

default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
Expand Down Expand Up @@ -271,7 +282,7 @@ func (c labelsCodec) EncodeResponse(ctx context.Context, res queryrange.Response
return &resp, nil
}

func (c labelsCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.Request, error) {
func (c labelsCodec) parseLabelsRequest(r *http.Request, op string, forwardHeaders []string) (queryrange.Request, error) {
var (
result ThanosLabelsRequest
err error
Expand Down Expand Up @@ -312,10 +323,20 @@ func (c labelsCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.
}
}

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers = append(result.Headers, &RequestHeader{Name: h, Values: hv})
break
}
}
}

return &result, nil
}

func (c labelsCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, error) {
func (c labelsCodec) parseSeriesRequest(r *http.Request, forwardHeaders []string) (queryrange.Request, error) {
var (
result ThanosSeriesRequest
err error
Expand Down Expand Up @@ -358,6 +379,16 @@ func (c labelsCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, er
}
}

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers = append(result.Headers, &RequestHeader{Name: h, Values: hv})
break
}
}
}

return &result, nil
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/queryfrontend/queryrange_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewThanosQueryRangeCodec(partialResponse bool) *queryRangeCodec {
}
}

func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (queryrange.Request, error) {
func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (queryrange.Request, error) {
var (
result ThanosQueryRangeRequest
err error
Expand Down Expand Up @@ -126,6 +126,14 @@ func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, _ []s
}
}

for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers = append(result.Headers, &RequestHeader{Name: h, Values: hv})
break
}
}
}
return &result, nil
}

Expand Down Expand Up @@ -161,7 +169,11 @@ func (c queryRangeCodec) EncodeRequest(ctx context.Context, r queryrange.Request
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

for _, hv := range thanosReq.Headers {
for _, v := range hv.Values {
req.Header.Add(hv.Name, v)
}
}
return req.WithContext(ctx), nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type ThanosRequest interface {
GetStoreMatchers() [][]*labels.Matcher
}

type RequestHeader struct {
Name string
Values []string
}

type ThanosQueryRangeRequest struct {
Path string
Start int64
Expand All @@ -33,6 +38,7 @@ type ThanosQueryRangeRequest struct {
ReplicaLabels []string
StoreMatchers [][]*labels.Matcher
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
}

// GetStart returns the start timestamp of the request in milliseconds.
Expand Down Expand Up @@ -107,6 +113,7 @@ type ThanosLabelsRequest struct {
StoreMatchers [][]*labels.Matcher
PartialResponse bool
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
}

// GetStart returns the start timestamp of the request in milliseconds.
Expand Down Expand Up @@ -178,6 +185,7 @@ type ThanosSeriesRequest struct {
Matchers [][]*labels.Matcher
StoreMatchers [][]*labels.Matcher
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
}

// GetStart returns the start timestamp of the request in milliseconds.
Expand Down
10 changes: 6 additions & 4 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger)
labelsCodec := NewThanosLabelsCodec(config.LabelsConfig.PartialResponseStrategy, config.DefaultTimeRange)

queryRangeTripperware, err := newQueryRangeTripperware(config.QueryRangeConfig, queryRangeLimits, queryRangeCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger)
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger, config.ForwardHeaders)
if err != nil {
return nil, err
}

labelsTripperware, err := newLabelsTripperware(config.LabelsConfig, labelsLimits, labelsCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "labels"}, reg), logger)
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "labels"}, reg), logger, config.ForwardHeaders)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,6 +138,7 @@ func newQueryRangeTripperware(
codec *queryRangeCodec,
reg prometheus.Registerer,
logger log.Logger,
forwardHeaders []string,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{queryrange.NewLimitsMiddleware(limits)}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
Expand Down Expand Up @@ -203,7 +204,7 @@ func newQueryRangeTripperware(
}

return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, nil, queryRangeMiddleware...)
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
Expand All @@ -218,6 +219,7 @@ func newLabelsTripperware(
codec *labelsCodec,
reg prometheus.Registerer,
logger log.Logger,
forwardHeaders []string,
) (queryrange.Tripperware, error) {
labelsMiddleware := []queryrange.Middleware{}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
Expand Down Expand Up @@ -265,7 +267,7 @@ func newLabelsTripperware(
)
}
return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, nil, labelsMiddleware...)
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, labelsMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
Expand Down

0 comments on commit b762212

Please sign in to comment.