diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index b8318f9bf9..790ddc230c 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -145,15 +145,34 @@ func runSidecar( g.Add(func() error { // Only check Prometheus's flags when upload is enabled. if uploads { - // Check prometheus's flags to ensure sane sidecar flags. + // Check prometheus's flags to ensure same sidecar flags. if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil { return errors.Wrap(err, "validate Prometheus flags") } } + // We retry infinitely until we reach and fetch BuildVersion from our Prometheus. + err := runutil.Retry(2*time.Second, ctx.Done(), func() error { + if err := m.BuildVersion(ctx); err != nil { + level.Warn(logger).Log( + "msg", "failed to fetch prometheus version. Is Prometheus running? Retrying", + "err", err, + ) + return err + } + + level.Info(logger).Log( + "msg", "successfully loaded prometheus version", + ) + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to get prometheus version") + } + // Blocking query of external labels before joining as a Source Peer into gossip. // We retry infinitely until we reach and fetch labels from our Prometheus. - err := runutil.Retry(2*time.Second, ctx.Done(), func() error { + err = runutil.Retry(2*time.Second, ctx.Done(), func() error { if err := m.UpdateLabels(ctx); err != nil { level.Warn(logger).Log( "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", @@ -209,14 +228,13 @@ func runSidecar( cancel() }) } - { t := exthttp.NewTransport() t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost t.MaxIdleConns = conf.connection.maxIdleConns c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -346,10 +364,11 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l type promMetadata struct { promURL *url.URL - mtx sync.Mutex - mint int64 - maxt int64 - labels labels.Labels + mtx sync.Mutex + mint int64 + maxt int64 + labels labels.Labels + promVersion string limitMinTime thanosmodel.TimeOrDurationValue @@ -395,6 +414,26 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) { return s.mint, s.maxt } +func (s *promMetadata) BuildVersion(ctx context.Context) error { + ver, err := s.client.BuildVersion(ctx, s.promURL) + if err != nil { + return err + } + + s.mtx.Lock() + defer s.mtx.Unlock() + + s.promVersion = ver + return nil +} + +func (s *promMetadata) Version() string { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.promVersion +} + type sidecarConfig struct { http httpConfig grpc grpcConfig diff --git a/go.mod b/go.mod index 9f349b14f2..116899c53c 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/NYTimes/gziphandler v1.1.1 github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible + github.com/blang/semver/v4 v4.0.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0 diff --git a/go.sum b/go.sum index d070c4160e..6561def31a 100644 --- a/go.sum +++ b/go.sum @@ -181,7 +181,10 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/blang/semver v3.5.0+incompatible h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs= github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= +github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= +github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 3ac3bb5147..4384b5d529 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -608,6 +608,39 @@ func (c *Client) AlertmanagerAlerts(ctx context.Context, base *url.URL) ([]*mode return v.Data, nil } +// BuildVersion returns Prometheus version from /api/v1/status/buildinfo Prometheus endpoint. +// For Prometheus versions < 2.14.0 it returns "0" as Prometheus version. +func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/status/buildinfo") + + level.Debug(c.logger).Log("msg", "build version", "url", u.String()) + + span, ctx := tracing.StartSpan(ctx, "/prom_buildversion HTTP[client]") + defer span.Finish() + + // We get status code 404 for prometheus versions lower than 2.14.0 + body, code, err := c.req2xx(ctx, &u, http.MethodGet) + if err != nil { + if code == http.StatusNotFound { + return "0", nil + } + return "", err + } + + var b struct { + Data struct { + Version string `json:"version"` + } `json:"data"` + } + + if err = json.Unmarshal(body, &b); err != nil { + return "", errors.Wrap(err, "unmarshal build info API response") + } + + return b.Data.Version, nil +} + func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 7239c4b3ad..894cc34893 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -17,6 +17,7 @@ import ( "strings" "sync" + "github.com/blang/semver/v4" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" @@ -50,12 +51,16 @@ type PrometheusStore struct { component component.StoreAPI externalLabelsFn func() labels.Labels timestamps func() (mint int64, maxt int64) + promVersion func() string remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType framesRead prometheus.Histogram } +// LabelValues call with matchers is supported for Prometheus versions >= 2.24.0 . +var baseVer, _ = semver.Make("2.24.0") + const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size. // NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client @@ -69,6 +74,7 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), + promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -80,6 +86,7 @@ func NewPrometheusStore( component: component, externalLabelsFn: externalLabelsFn, timestamps: timestamps, + promVersion: promVersion, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -496,9 +503,45 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue return &storepb.LabelValuesResponse{Values: []string{l}}, nil } - vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, nil, r.Start, r.End) - if err != nil { - return nil, err + var ( + sers []map[string]string + err error + ) + + lvc := false // LabelValuesCall + vals := []string{} + v := p.promVersion() + + version, err := semver.Parse(v) + if err == nil && version.GTE(baseVer) { + lvc = true + } + + if len(r.Matchers) == 0 || lvc { + vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Matchers, r.Start, r.End) + if err != nil { + return nil, err + } + } else { + matchers, err := storepb.MatchersToPromMatchers(r.Matchers...) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End) + if err != nil { + return nil, err + } + + // Using set to handle duplicate values. + labelValuesSet := make(map[string]struct{}) + for _, s := range sers { + if val, exists := s[r.Label]; exists { + labelValuesSet[val] = struct{}{} + } + } + for key := range labelValuesSet { + vals = append(vals, key) + } } sort.Strings(vals) return &storepb.LabelValuesResponse{Values: vals}, nil diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index e637b9afaa..c99d0375f5 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { limitMinT := int64(0) proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter. + func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter. testutil.Ok(t, err) // Query all three samples except for the first one. Since we round up queried data @@ -199,7 +199,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }) + func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil) testutil.Ok(t, err) for _, tcase := range []struct { @@ -375,7 +375,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, nil) testutil.Ok(t, err) resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{ @@ -420,7 +420,11 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) + version, err := promclient.NewDefaultClient().BuildVersion(ctx, u) + testutil.Ok(t, err) + + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, + func() string { return version }) testutil.Ok(t, err) resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ @@ -441,6 +445,32 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, []string(nil), resp.Warnings) testutil.Equals(t, []string{}, resp.Values) + + // check label value matcher + resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ + Label: "a", + Start: timestamp.FromTime(minTime), + End: timestamp.FromTime(maxTime), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + }) + testutil.Ok(t, err) + testutil.Equals(t, []string(nil), resp.Warnings) + testutil.Equals(t, []string{"b"}, resp.Values) + + // check another label value matcher + resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ + Label: "a", + Start: timestamp.FromTime(minTime), + End: timestamp.FromTime(maxTime), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "d"}, + }, + }) + testutil.Ok(t, err) + testutil.Equals(t, []string(nil), resp.Warnings) + testutil.Equals(t, []string{}, resp.Values) } // Test to check external label values retrieve. @@ -466,7 +496,10 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) + version, err := promclient.NewDefaultClient().BuildVersion(ctx, u) + testutil.Ok(t, err) + + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, func() string { return version }) testutil.Ok(t, err) resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ @@ -512,7 +545,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }) + func() (int64, int64) { return 0, math.MaxInt64 }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -557,7 +590,7 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 123, 456 }) + func() (int64, int64) { return 123, 456 }, nil) testutil.Ok(t, err) resp, err := proxy.Info(ctx, &storepb.InfoRequest{}) @@ -635,7 +668,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }) + func() (int64, int64) { return 0, math.MaxInt64 }, nil) testutil.Ok(t, err) // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.