Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar: Added matchers support #3940

Merged
merged 16 commits into from
May 7, 2021
Merged
55 changes: 47 additions & 8 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,34 @@ func runSidecar(
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure sane sidecar flags.
// Check prometheus's flags to ensure same sidecar flags.
if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}

// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.BuildVersion(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -209,14 +228,13 @@ func runSidecar(
cancel()
})
}

{
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -346,10 +364,11 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string

limitMinTime thanosmodel.TimeOrDurationValue

Expand Down Expand Up @@ -395,6 +414,26 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
return s.mint, s.maxt
}

func (s *promMetadata) BuildVersion(ctx context.Context) error {
ver, err := s.client.BuildVersion(ctx, s.promURL)
if err != nil {
return err
}

s.mtx.Lock()
defer s.mtx.Unlock()

s.promVersion = ver
return nil
}

func (s *promMetadata) Version() string {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.promVersion
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/blang/semver/v4 v4.0.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver v3.5.0+incompatible h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs=
github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
Expand Down
33 changes: 33 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,39 @@ func (c *Client) AlertmanagerAlerts(ctx context.Context, base *url.URL) ([]*mode
return v.Data, nil
}

// BuildVersion returns Prometheus version from /api/v1/status/buildinfo Prometheus endpoint.
// For Prometheus versions < 2.14.0 it returns "0" as Prometheus version.
func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/status/buildinfo")

level.Debug(c.logger).Log("msg", "build version", "url", u.String())

span, ctx := tracing.StartSpan(ctx, "/prom_buildversion HTTP[client]")
defer span.Finish()

// We get status code 404 for prometheus versions lower than 2.14.0
body, code, err := c.req2xx(ctx, &u, http.MethodGet)
if err != nil {
if code == http.StatusNotFound {
return "0", nil
}
return "", err
}

var b struct {
Data struct {
Version string `json:"version"`
} `json:"data"`
}

if err = json.Unmarshal(body, &b); err != nil {
return "", errors.Wrap(err, "unmarshal build info API response")
}

return b.Data.Version, nil
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
}

func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}
Expand Down
49 changes: 46 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"
"sync"

"github.com/blang/semver/v4"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -50,12 +51,16 @@ type PrometheusStore struct {
component component.StoreAPI
externalLabelsFn func() labels.Labels
timestamps func() (mint int64, maxt int64)
promVersion func() string

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram
}

// LabelValues call with matchers is supported for Prometheus versions >= 2.24.0 .
var baseVer, _ = semver.Make("2.24.0")

const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size.

// NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client
Expand All @@ -69,6 +74,7 @@ func NewPrometheusStore(
component component.StoreAPI,
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -80,6 +86,7 @@ func NewPrometheusStore(
component: component,
externalLabelsFn: externalLabelsFn,
timestamps: timestamps,
promVersion: promVersion,
remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
Expand Down Expand Up @@ -496,9 +503,45 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, nil, r.Start, r.End)
if err != nil {
return nil, err
var (
sers []map[string]string
err error
)

lvc := false // LabelValuesCall
vals := []string{}
v := p.promVersion()

version, err := semver.Parse(v)
if err == nil && version.GTE(baseVer) {
lvc = true
}

if len(r.Matchers) == 0 || lvc {
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Matchers, r.Start, r.End)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we making gRPC call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming makes me confused, too. We are making HTTP calls but return GRPC error.

if err != nil {
return nil, err
}
} else {
matchers, err := storepb.MatchersToPromMatchers(r.Matchers...)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
if err != nil {
return nil, err
}

// Using set to handle duplicate values.
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
labelValuesSet := make(map[string]struct{})
for _, s := range sers {
if val, exists := s[r.Label]; exists {
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
labelValuesSet[val] = struct{}{}
}
}
for key := range labelValuesSet {
vals = append(vals, key)
}
}
sort.Strings(vals)
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
return &storepb.LabelValuesResponse{Values: vals}, nil
Expand Down
49 changes: 41 additions & 8 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 })
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil)
testutil.Ok(t, err)

for _, tcase := range []struct {
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Expand Down Expand Up @@ -420,7 +420,11 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
version, err := promclient.NewDefaultClient().BuildVersion(ctx, u)
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil,
func() string { return version })
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand All @@ -441,6 +445,32 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)

// check label value matcher
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"b"}, resp.Values)

// check another label value matcher
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "d"},
},
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)
}

// Test to check external label values retrieve.
Expand All @@ -466,7 +496,10 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
version, err := promclient.NewDefaultClient().BuildVersion(ctx, u)
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, func() string { return version })
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -512,7 +545,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -557,7 +590,7 @@ func TestPrometheusStore_Info(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 })
func() (int64, int64) { return 123, 456 }, nil)
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -635,7 +668,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down