Skip to content

Commit

Permalink
federated targets functionality (thanos-io#1375)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Tunik <[email protected]>
  • Loading branch information
2nick committed Mar 22, 2021
1 parent 468d0d6 commit 0db9145
Show file tree
Hide file tree
Showing 27 changed files with 3,303 additions and 17 deletions.
36 changes: 36 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -108,6 +109,10 @@ func registerQuery(app *extkingpin.App) {
exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
Hidden().PlaceHolder("<exemplar>").Strings()

// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
Hidden().PlaceHolder("<target>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -135,6 +140,9 @@ func registerQuery(app *extkingpin.App) {
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
Hidden().Default("true").Bool()

enableTargetPartialResponse := cmd.Flag("target.partial-response", "Enable partial response for targets endpoint. --no-target.partial-response for disabling.").
Hidden().Default("true").Bool()

enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

Expand Down Expand Up @@ -178,6 +186,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "error while parsing config for request logging")
}

if dup := firstDuplicate(*targetEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Expand Down Expand Up @@ -232,11 +244,13 @@ func registerQuery(app *extkingpin.App) {
getFlagsMap(cmd.Flags()),
*stores,
*ruleEndpoints,
*targetEndpoints,
*metadataEndpoints,
*exemplarEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
*enableRulePartialResponse,
*enableTargetPartialResponse,
*enableMetricMetadataPartialResponse,
fileSD,
time.Duration(*dnsSDInterval),
Expand Down Expand Up @@ -290,11 +304,13 @@ func runQuery(
flagsMap map[string]string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
metadataAddrs []string,
exemplarAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableTargetPartialResponse bool,
enableMetricMetadataPartialResponse bool,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
Expand Down Expand Up @@ -336,6 +352,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsTargetProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsMetadataProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
Expand Down Expand Up @@ -374,6 +396,13 @@ func runQuery(

return specs
},
func() (specs []query.TargetSpec) {
for _, addr := range dnsTargetProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
Expand All @@ -393,6 +422,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
queryableCreator = query.NewQueryableCreator(
Expand Down Expand Up @@ -481,6 +511,9 @@ func runQuery(
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
Expand Down Expand Up @@ -534,11 +567,13 @@ func runQuery(
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
metadata.NewGRPCClient(metadataProxy),
exemplars.NewGRPCClientWithDedup(exemplarsProxy, queryReplicaLabels),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
enableTargetPartialResponse,
enableMetricMetadataPartialResponse,
queryReplicaLabels,
flagsMap,
Expand Down Expand Up @@ -581,6 +616,7 @@ func runQuery(
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarsProxy)),
grpcserver.WithListen(grpcBindAddr),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -230,6 +231,7 @@ func runSidecar(
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithListen(conf.grpc.bindAddress),
Expand Down
Loading

0 comments on commit 0db9145

Please sign in to comment.