Skip to content

Commit

Permalink
Querier/Sidecar/StoreGW: Implement Info service and add --endpoint
Browse files Browse the repository at this point in the history
…flag in Querier (#4282)

* Add Info server in Sidecar and StoreGW

Signed-off-by: Hitanshu Mehta <[email protected]>

* Add `--endpoint` in querier

Signed-off-by: Hitanshu Mehta <[email protected]>

* Add E2E test for Info API

Signed-off-by: Hitanshu Mehta <[email protected]>

* Make min/max times global vaiables and some other minor changes

Signed-off-by: Hitanshu Mehta <[email protected]>

* Enhance e2e test

Signed-off-by: Hitanshu Mehta <[email protected]>

* Rename `InfoRequest` and `InfoResponse` to `InfoReq` and `InfoResp`

Signed-off-by: Hitanshu Mehta <[email protected]>

* Minor fixes and change package name for proto file for info service

Signed-off-by: Hitanshu Mehta <[email protected]>

* resolve merge conflicts

Signed-off-by: Hitanshu Mehta <[email protected]>

* add deprecation warning in description of old flags

Signed-off-by: Hitanshu Mehta <[email protected]>

* fix typo

Signed-off-by: Hitanshu Mehta <[email protected]>

* export new Timestamps method

Signed-off-by: Hitanshu Mehta <[email protected]>

* add functional options in intializer of info server

Signed-off-by: Hitanshu Mehta <[email protected]>

* change e2e tests to use efficientgo/e2e

Signed-off-by: Hitanshu Mehta <[email protected]>

* nits in info api e2e tests

Signed-off-by: Hitanshu Mehta <[email protected]>

* minor nits

Signed-off-by: Hitanshu Mehta <[email protected]>

* fix docs

Signed-off-by: Hitanshu Mehta <[email protected]>
  • Loading branch information
hitanshu-mehta authored Nov 9, 2021
1 parent 0ef2fda commit ac2cc26
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 21 deletions.
34 changes: 28 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,24 @@ func registerQuery(app *extkingpin.App) {
selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()

stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
endpoints := cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").
PlaceHolder("<endpoint>").Strings()

stores := cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>").Strings()

// TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
ruleEndpoints := cmd.Flag("rule", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<rule>").Strings()

metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
metadataEndpoints := cmd.Flag("metadata", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
Hidden().PlaceHolder("<metadata>").Strings()

exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
exemplarEndpoints := cmd.Flag("exemplar", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
Hidden().PlaceHolder("<exemplar>").Strings()

// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
targetEndpoints := cmd.Flag("target", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
Hidden().PlaceHolder("<target>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
Expand Down Expand Up @@ -264,6 +267,7 @@ func registerQuery(app *extkingpin.App) {
*queryReplicaLabels,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
*stores,
*ruleEndpoints,
*targetEndpoints,
Expand Down Expand Up @@ -329,6 +333,7 @@ func runQuery(
queryReplicaLabels []string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
Expand Down Expand Up @@ -376,6 +381,12 @@ func runQuery(
}
}

dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsRuleProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg),
Expand Down Expand Up @@ -410,7 +421,14 @@ func runQuery(
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

for _, dnsProvider := range []*dns.Provider{dnsStoreProvider, dnsRuleProvider, dnsExemplarProvider, dnsMetadataProvider, dnsTargetProvider} {
for _, dnsProvider := range []*dns.Provider{
dnsStoreProvider,
dnsRuleProvider,
dnsExemplarProvider,
dnsMetadataProvider,
dnsTargetProvider,
dnsEndpointProvider,
} {
var tmpSpecs []query.EndpointSpec

for _, addr := range dnsProvider.Addresses() {
Expand Down Expand Up @@ -527,6 +545,10 @@ func runQuery(
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
}
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err)

}
return nil
})
}, func(error) {
Expand Down
40 changes: 39 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/info"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
meta "github.com/thanos-io/thanos/pkg/metadata"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/reloader"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -241,12 +245,46 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

exemplarSrv := exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels)

infoSrv := info.NewInfoServer(
component.Sidecar.String(),
info.WithLabelSet(func() []labelpb.ZLabelSet {
return promStore.LabelSet()
}),
info.WithStoreInfo(func() *infopb.StoreInfo {
mint, maxt := promStore.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
}
}),
info.WithExemplarsInfo(func() *infopb.ExemplarsInfo {
// Currently Exemplars API does not expose metadata such as min/max time,
// so we are using default minimum and maximum possible values as min/max time.
return &infopb.ExemplarsInfo{
MinTime: query.MinTime,
MaxTime: query.MaxTime,
}
}),
info.WithRulesInfo(func() *infopb.RulesInfo {
return &infopb.RulesInfo{}
}),
info.WithTargetInfo(func() *infopb.TargetsInfo {
return &infopb.TargetsInfo{}
}),
info.WithMetricMetadataInfo(func() *infopb.MetricMetadataInfo {
return &infopb.MetricMetadataInfo{}
}),
)

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarSrv)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
19 changes: 19 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/info"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand All @@ -40,6 +42,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -382,6 +385,21 @@ func runStore(
cancel()
})
}

infoSrv := info.NewInfoServer(
component.Store.String(),
info.WithLabelSet(func() []labelpb.ZLabelSet {
return bs.LabelSet()
}),
info.WithStoreInfo(func() *infopb.StoreInfo {
mint, maxt := bs.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
}
}),
)

// Start query (proxy) gRPC StoreAPI.
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)
Expand All @@ -391,6 +409,7 @@ func runStore(

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
16 changes: 11 additions & 5 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ Flags:
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
--endpoint=<endpoint> ... Addresses of statically configured Thanos API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
Thanos API servers through respective DNS
lookups.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down Expand Up @@ -367,11 +372,12 @@ Flags:
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
--store=<store> ... Addresses of statically configured store API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--store=<store> ... Deprecation Warning - This flag is deprecated
and replaced with `endpoint`. Addresses of
statically configured store API servers
(repeatable). The scheme may be prefixed with
'dns+' or 'dnssrv+' to detect store API servers
through respective DNS lookups.
--store-strict=<staticstore> ...
Addresses of only statically configured store
API servers that are always used, even if the
Expand Down
122 changes: 122 additions & 0 deletions pkg/info/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package info

import (
"context"

"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"google.golang.org/grpc"
)

type InfoServer struct {
infopb.UnimplementedInfoServer

component string

getLabelSet func() []labelpb.ZLabelSet
getStoreInfo func() *infopb.StoreInfo
getExemplarsInfo func() *infopb.ExemplarsInfo
getRulesInfo func() *infopb.RulesInfo
getTargetsInfo func() *infopb.TargetsInfo
getMetricMetadataInfo func() *infopb.MetricMetadataInfo
}

func NewInfoServer(
component string,
options ...func(*InfoServer),
) *InfoServer {
srv := &InfoServer{
component: component,
}

for _, o := range options {
o(srv)
}

return srv
}

func WithLabelSet(getLabelSet func() []labelpb.ZLabelSet) func(*InfoServer) {
return func(s *InfoServer) {
s.getLabelSet = getLabelSet
}
}

func WithStoreInfo(getStoreInfo func() *infopb.StoreInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getStoreInfo = getStoreInfo
}
}

func WithRulesInfo(getRulesInfo func() *infopb.RulesInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getRulesInfo = getRulesInfo
}
}

func WithExemplarsInfo(getExemplarsInfo func() *infopb.ExemplarsInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getExemplarsInfo = getExemplarsInfo
}
}

func WithTargetInfo(getTargetsInfo func() *infopb.TargetsInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getTargetsInfo = getTargetsInfo
}
}

func WithMetricMetadataInfo(getMetricMetadataInfo func() *infopb.MetricMetadataInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getMetricMetadataInfo = getMetricMetadataInfo
}
}

// RegisterInfoServer register info server.
func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) {
return func(s *grpc.Server) {
infopb.RegisterInfoServer(s, infoSrv)
}
}

func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*infopb.InfoResponse, error) {

if srv.getLabelSet == nil {
srv.getLabelSet = func() []labelpb.ZLabelSet { return nil }
}

if srv.getStoreInfo == nil {
srv.getStoreInfo = func() *infopb.StoreInfo { return nil }
}

if srv.getExemplarsInfo == nil {
srv.getExemplarsInfo = func() *infopb.ExemplarsInfo { return nil }
}

if srv.getRulesInfo == nil {
srv.getRulesInfo = func() *infopb.RulesInfo { return nil }
}

if srv.getTargetsInfo == nil {
srv.getTargetsInfo = func() *infopb.TargetsInfo { return nil }
}

if srv.getMetricMetadataInfo == nil {
srv.getMetricMetadataInfo = func() *infopb.MetricMetadataInfo { return nil }
}

resp := &infopb.InfoResponse{
LabelSets: srv.getLabelSet(),
ComponentType: srv.component,
Store: srv.getStoreInfo(),
Exemplars: srv.getExemplarsInfo(),
Rules: srv.getRulesInfo(),
Targets: srv.getTargetsInfo(),
MetricMetadata: srv.getMetricMetadataInfo(),
}

return resp, nil
}
18 changes: 11 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,16 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
return mint, maxt
}

func (s *BucketStore) LabelSet() []labelpb.ZLabelSet {
labelSets := s.advLabelSets

if s.enableCompatibilityLabel && len(labelSets) > 0 {
labelSets = append(labelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}})
}

return labelSets
}

// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
Expand All @@ -691,14 +701,8 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
}

s.mtx.RLock()
res.LabelSets = s.advLabelSets
res.LabelSets = s.LabelSet()
s.mtx.RUnlock()

if s.enableCompatibilityLabel && len(res.LabelSets) > 0 {
// This is for compatibility with Querier v0.7.0.
// See query.StoreCompatibilityTypeLabelName comment for details.
res.LabelSets = append(res.LabelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}})
}
return res, nil
}

Expand Down
Loading

0 comments on commit ac2cc26

Please sign in to comment.