Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar: Added matchers support #3940

Merged
merged 16 commits into from
May 7, 2021
58 changes: 53 additions & 5 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,41 @@ func runSidecar(
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.BuildVersion(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheur version. Is Prometheus running? Retrying",
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
"err", err,
)
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still thinking about this logic.
This doesn't work well when Prometheus version is < 2.14.0, where /api/v1/status/buildinfo doesn't exist. In this case, we just get 404s but we are still retrying indefinitely. This seems wrong to me.

We can only retry for a short time or we check the return error to stop retrying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets retry for lets say N times (maybe N=5) and exit if retry fails?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for Prometheus <2.14.0 this function would fail, so we also need to decide how to fetch build version for <2.14.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in contributor hours:
Let’s check for 404, if it’s present, set 0 value for promVersion


level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
"prometheus_version", m.Version(),
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you run this in a goroutine and it ends, the sidecar program also stops running because of the run group. Please make sure your code doesn't break the functionality. You can see the tests failed.

I think it is unnecessary to have a separate goroutine for checking the Prometheus version as it is unlikely to change.
We can run it in the main thread once? WDYT? @yashrsharma44 @Namanl2001 @bwplotka

Copy link
Contributor Author

@Namanl2001 Namanl2001 Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a doubt that we already have more than one actor in this run group but why there is no such problem with them?

I think it is unnecessary to have a separate goroutine for checking the Prometheus version as it is unlikely to change.

Do we add things in a separate goroutine which are likely to change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}, func(error) {
cancel()
})
}

{
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -346,10 +373,11 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string

limitMinTime thanosmodel.TimeOrDurationValue

Expand Down Expand Up @@ -395,6 +423,26 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
return s.mint, s.maxt
}

func (s *promMetadata) BuildVersion(ctx context.Context) error {
ver, err := s.client.BuildVersion(ctx, s.promURL)
if err != nil {
return err
}

s.mtx.Lock()
defer s.mtx.Unlock()

s.promVersion = ver
return nil
}

func (s *promMetadata) Version() string {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.promVersion
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
Expand Down
28 changes: 28 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ func (c *Client) AlertmanagerAlerts(ctx context.Context, base *url.URL) ([]*mode
return v.Data, nil
}

// BuildVersion returns Prometheus version from /api/v1/status/buildinfo Prometheus endpoint.
func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/status/buildinfo")

level.Debug(c.logger).Log("msg", "build version", "url", u.String())

span, ctx := tracing.StartSpan(ctx, "/prom_buildversion HTTP[client]")
defer span.Finish()

body, _, err := c.req2xx(ctx, &u, http.MethodGet)
if err != nil {
return "", err
}

var b struct {
Data struct {
Version string `json:"version"`
} `json:"data"`
}

if err = json.Unmarshal(body, &b); err != nil {
return "", errors.Wrap(err, "unmarshal build info API response")
}

return b.Data.Version, nil
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
}

func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}
Expand Down
38 changes: 35 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type PrometheusStore struct {
component component.StoreAPI
externalLabelsFn func() labels.Labels
timestamps func() (mint int64, maxt int64)
promVersion func() string

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

Expand All @@ -69,6 +70,7 @@ func NewPrometheusStore(
component component.StoreAPI,
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -80,6 +82,7 @@ func NewPrometheusStore(
component: component,
externalLabelsFn: externalLabelsFn,
timestamps: timestamps,
promVersion: promVersion,
remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
Expand Down Expand Up @@ -496,9 +499,38 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, nil, r.Start, r.End)
if err != nil {
return nil, err
var (
vals []string
sers []map[string]string
err error
)

version := p.promVersion()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p.promVersion might return some garbage value if the m.BuildVersion doesn't run correctly, so let's make sure version is a correct version string, and not some garbage value.

if version > "2.24" {
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Matchers, r.Start, r.End)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we making gRPC call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming makes me confused, too. We are making HTTP calls but return GRPC error.

if err != nil {
return nil, err
}
} else {
matchers, err := storepb.MatchersToPromMatchers(r.Matchers...)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
if err != nil {
return nil, err
}

// using set to handle duplicate values.
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
labelValuesSet := make(map[string]struct{})
for _, s := range sers {
if val, exists := s[r.Label]; exists {
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
labelValuesSet[val] = struct{}{}
}
}
for key := range labelValuesSet {
vals = append(vals, key)
}
}
sort.Strings(vals)
Namanl2001 marked this conversation as resolved.
Show resolved Hide resolved
return &storepb.LabelValuesResponse{Values: vals}, nil
Expand Down
43 changes: 35 additions & 8 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 })
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil)
testutil.Ok(t, err)

for _, tcase := range []struct {
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Expand Down Expand Up @@ -420,7 +420,8 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil,
func() string { return "2.25" })
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand All @@ -441,6 +442,32 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)

// check label value matcher
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"b"}, resp.Values)

// check another label value matcher
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "d"},
},
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)
}

// Test to check external label values retrieve.
Expand All @@ -466,7 +493,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -512,7 +539,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -557,7 +584,7 @@ func TestPrometheusStore_Info(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 })
func() (int64, int64) { return 123, 456 }, nil)
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -635,7 +662,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down