Skip to content

Commit

Permalink
discovery: use thanos resolver for endpoint groups (thanos-io#7565)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored and jnyi committed Oct 16, 2024
1 parent 6392d5f commit a5a9d61
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
16 changes: 12 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,16 @@ func runQuery(
}
}

// Register resolver for the "thanos:///" scheme for endpoint-groups
dns.RegisterGRPCResolver(
dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg),
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
)

dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
Expand Down Expand Up @@ -921,14 +931,12 @@ func prepareEndpointSet(
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

Expand Down
97 changes: 97 additions & 0 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package dns

import (
"context"
"sync"
"time"

grpcresolver "google.golang.org/grpc/resolver"
)

var (
_ grpcresolver.Builder = &builder{}
_ grpcresolver.Resolver = &resolver{}
)

type builder struct {
resolveInterval time.Duration
provider *Provider
}

func RegisterGRPCResolver(provider *Provider, interval time.Duration) {
grpcresolver.Register(&builder{
resolveInterval: interval,
provider: provider,
})
}

func (b *builder) Scheme() string { return "thanos" }

func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grpcresolver.BuildOptions) (grpcresolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &resolver{
provider: b.provider,
target: t.Endpoint(),
ctx: ctx,
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
}
r.wg.Add(1)
go r.run()

return r, nil
}

type resolver struct {
provider *Provider

target string
ctx context.Context
cancel context.CancelFunc
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
}

func (r *resolver) Close() {
r.cancel()
r.wg.Wait()
}

func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {}

func (r *resolver) resolve() error {
ctx, cancel := context.WithTimeout(r.ctx, r.interval)
defer cancel()
return r.provider.Resolve(ctx, []string{r.target})
}

func (r *resolver) addresses() []string {
return r.provider.AddressesForHost(r.target)
}

func (r *resolver) run() {
defer r.wg.Done()
for {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
} else {
state := grpcresolver.State{}
for _, addr := range r.addresses() {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
}
_ = r.cc.UpdateState(state)
}
select {
case <-r.ctx.Done():
return
case <-time.After(r.interval):
}
}
}

0 comments on commit a5a9d61

Please sign in to comment.