Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated targets functionality #3350

Merged
merged 1 commit into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

## Unreleased

- [#3350](https://github.com/thanos-io/thanos/pull/3350) Query/Sidecar: Added targets API support. You can now configure you Querier to fetch Prometheus targets from leaf Prometheus-es!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be under # Added section I suppose? 🤗

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bigger, we can clean up later.


### Added

- [#3977](https://github.com/thanos-io/thanos/pull/3903) Expose exemplars for `http_request_duration_seconds` histogram if tracing is enabled.
Expand Down
36 changes: 36 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -108,6 +109,10 @@ func registerQuery(app *extkingpin.App) {
exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
Hidden().PlaceHolder("<exemplar>").Strings()

// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
Hidden().PlaceHolder("<target>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -135,6 +140,9 @@ func registerQuery(app *extkingpin.App) {
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
Hidden().Default("true").Bool()

enableTargetPartialResponse := cmd.Flag("target.partial-response", "Enable partial response for targets endpoint. --no-target.partial-response for disabling.").
Hidden().Default("true").Bool()

enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

Expand Down Expand Up @@ -178,6 +186,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "error while parsing config for request logging")
}

if dup := firstDuplicate(*targetEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Expand Down Expand Up @@ -232,11 +244,13 @@ func registerQuery(app *extkingpin.App) {
getFlagsMap(cmd.Flags()),
*stores,
*ruleEndpoints,
*targetEndpoints,
*metadataEndpoints,
*exemplarEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
*enableRulePartialResponse,
*enableTargetPartialResponse,
*enableMetricMetadataPartialResponse,
fileSD,
time.Duration(*dnsSDInterval),
Expand Down Expand Up @@ -290,11 +304,13 @@ func runQuery(
flagsMap map[string]string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
metadataAddrs []string,
exemplarAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableTargetPartialResponse bool,
enableMetricMetadataPartialResponse bool,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
Expand Down Expand Up @@ -336,6 +352,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsTargetProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsMetadataProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
Expand Down Expand Up @@ -374,6 +396,13 @@ func runQuery(

return specs
},
func() (specs []query.TargetSpec) {
for _, addr := range dnsTargetProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
Expand All @@ -393,6 +422,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
queryableCreator = query.NewQueryableCreator(
Expand Down Expand Up @@ -481,6 +511,9 @@ func runQuery(
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
Expand Down Expand Up @@ -534,11 +567,13 @@ func runQuery(
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
metadata.NewGRPCClient(metadataProxy),
exemplars.NewGRPCClientWithDedup(exemplarsProxy, queryReplicaLabels),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
enableTargetPartialResponse,
enableMetricMetadataPartialResponse,
queryReplicaLabels,
flagsMap,
Expand Down Expand Up @@ -581,6 +616,7 @@ func runQuery(
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarsProxy)),
grpcserver.WithListen(grpcBindAddr),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -230,6 +231,7 @@ func runSidecar(
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithListen(conf.grpc.bindAddress),
Expand Down
40 changes: 40 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -75,12 +77,14 @@ type QueryAPI struct {
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient
targets targets.UnaryClient
metadatas metadata.UnaryClient
exemplars exemplars.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
enableTargetPartialResponse bool
enableMetricMetadataPartialResponse bool
enableExemplarPartialResponse bool
disableCORS bool
Expand All @@ -100,11 +104,13 @@ func NewQueryAPI(
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
targets targets.UnaryClient,
metadatas metadata.UnaryClient,
exemplars exemplars.UnaryClient,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableTargetPartialResponse bool,
enableMetricMetadataPartialResponse bool,
replicaLabels []string,
flagsMap map[string]string,
Expand All @@ -121,12 +127,14 @@ func NewQueryAPI(
queryableCreate: c,
gate: gate,
ruleGroups: ruleGroups,
targets: targets,
metadatas: metadatas,
exemplars: exemplars,

enableAutodownsampling: enableAutodownsampling,
enableQueryPartialResponse: enableQueryPartialResponse,
enableRulePartialResponse: enableRulePartialResponse,
enableTargetPartialResponse: enableTargetPartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
replicaLabels: replicaLabels,
storeSet: storeSet,
Expand Down Expand Up @@ -161,6 +169,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge

r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse)))

r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))

r.Get("/query_exemplars", instr("exemplars", NewExemplarsHandler(qapi.exemplars, qapi.enableExemplarPartialResponse)))
Expand Down Expand Up @@ -662,6 +672,36 @@ func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiErr
return statuses, nil, nil
}

func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
stateParam := r.URL.Query().Get("state")
state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)]
if !ok {
if stateParam != "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid targets parameter state='%v'", stateParam)}
}
state = int32(targetspb.TargetsRequest_ANY)
}

req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_State(state),
PartialResponseStrategy: ps,
}

t, warnings, err := client.Targets(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")}
}

return t, warnings, nil
}
}

// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
// which uses gRPC Unary Rules API.
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
Expand Down
17 changes: 17 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc/codes"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -782,3 +783,19 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin

return m.Data, nil
}

func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/targets")

if stateTargets != "" {
q := u.Query()
q.Add("state", stateTargets)
u.RawQuery = q.Encode()
}

var v struct {
Data *targetspb.TargetDiscovery `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/targets HTTP[client]", &u, &v)
}
Loading