From 1215c5facdf32aa6b22298d453770200f56965bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Thu, 30 May 2019 18:07:23 +0200 Subject: [PATCH] pkg/receive: forward metrics This commit enables metrics forwarding from one receive node to another. The receive nodes construct hashrings from the given sd-files and use these hashrings to select a node to which toforward a given time series. Time series are batched together to ensure that for any incoming write-request to a node, at most one outgoing write-request will be made every other node in the hashring. --- cmd/thanos/receive.go | 69 ++++++++++++ pkg/receive/handler.go | 91 ++++++++++++++- pkg/receive/hashring.go | 62 ++++++++++- pkg/receive/hashring_test.go | 209 +++++++++++++++++++++++++++++++++-- 4 files changed, 415 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index fa86a4c79db..7554066bddb 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "net" + "os" + "strings" "sync" "time" @@ -22,6 +24,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" @@ -46,12 +50,41 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri retention := modelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention").Default("15d")) + fileSDFiles := cmd.Flag("receive.sd-files", "Path to file that contain addresses of receive peers. The path can be a glob pattern (repeatable)."). + PlaceHolder("").Strings() + + fileSDInterval := modelDuration(cmd.Flag("receive.sd-interval", "Refresh interval to re-read file SD files. (used as a fallback)"). + Default("5m")) + + local := cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration.").String() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { return errors.Wrap(err, "parse labels") } + var fileSD *file.Discovery + if len(*fileSDFiles) > 0 { + conf := &file.SDConfig{ + Files: *fileSDFiles, + RefreshInterval: *fileSDInterval, + } + fileSD = file.NewDiscovery(conf, logger) + } + + // Local is empty, so try to generate a local endpoint + // based on the hostname and the listening port. + if *local == "" { + hostname, err := os.Hostname() + if hostname == "" || err != nil { + return errors.New("--receive.local-endpoint is empty and host could not be determined.") + } + parts := strings.Split(*remoteWriteAddress, ":") + port := parts[len(parts)-1] + *local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port) + } + return runReceive( g, logger, @@ -67,6 +100,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig, lset, *retention, + fileSD, + *local, ) } } @@ -86,6 +121,8 @@ func runReceive( objStoreConfig *pathOrContent, lset labels.Labels, retention model.Duration, + fileSD *file.Discovery, + endpoint string, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") @@ -97,6 +134,36 @@ func runReceive( MaxBlockDuration: model.Duration(time.Hour * 2), } + var hashring receive.Hashring + // Run File Service Discovery and check for updates. + if fileSD != nil { + ctxRun, cancelRun := context.WithCancel(context.Background()) + fileSDUpdates := make(chan []*targetgroup.Group) + go fileSD.Run(ctxRun, fileSDUpdates) + + // We need to wait for initial discovery to create the hashring. + groups := <-fileSDUpdates + hashring = receive.NewHashring(receive.ExactMatcher, groups) + g.Add(func() error { + for { + select { + // If the SD changes, we need to shutdown. + case update := <-fileSDUpdates: + if !receive.HashringsAreEquivalent(groups, update) { + level.Info(logger).Log("msg", "file service discovery changed") + return nil + } + case <-ctxRun.Done(): + return nil + } + } + }, func(error) { + cancelRun() + }) + } else { + hashring = receive.SingleNodeHashring(endpoint) + } + localStorage := &tsdb.ReadyStorage{} receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ @@ -104,6 +171,8 @@ func runReceive( ListenAddress: remoteWriteAddress, Registry: reg, ReadyStorage: localStorage, + Hashring: hashring, + Endpoint: endpoint, }) // Start all components while we wait for TSDB to open but only load diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c96cde5d763..8ac0d0cc018 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1,6 +1,8 @@ package receive import ( + "bytes" + "context" "fmt" "io/ioutil" stdlog "log" @@ -17,12 +19,17 @@ import ( conntrack "github.com/mwitkow/go-conntrack" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/route" promtsdb "github.com/prometheus/prometheus/storage/tsdb" ) +const ( + tenantHeader = "THANOS_TENANT" +) + var ( requestDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -48,6 +55,8 @@ type Options struct { ListenAddress string Registry prometheus.Registerer ReadyStorage *promtsdb.ReadyStorage + Hashring Hashring + Endpoint string } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -56,6 +65,7 @@ type Handler struct { logger log.Logger receiver *Writer router *route.Router + hashring Hashring options *Options listener net.Listener @@ -83,6 +93,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { router: router, readyStorage: o.ReadyStorage, receiver: o.Receiver, + hashring: o.Hashring, options: o, } @@ -160,8 +171,8 @@ func (h *Handler) Run() error { return httpSrv.Serve(h.listener) } -func (h *Handler) receive(w http.ResponseWriter, req *http.Request) { - compressed, err := ioutil.ReadAll(req.Body) +func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -180,8 +191,82 @@ func (h *Handler) receive(w http.ResponseWriter, req *http.Request) { return } - if err := h.receiver.Receive(&wreq); err != nil { + tenant := r.Header.Get(tenantHeader) + local, err := h.forward(r.Context(), tenant, &wreq) + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + // There may be no WriteRequest destined for the local node. + if local != nil { + if err := h.receiver.Receive(local); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } +} + +// forward accepts a write request, batches its time series by +// corresponding endpoint, and forwards them in parallel. It returns a write +// request containing only the time series that correspond to +// local handler. +func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.WriteRequest) (*prompb.WriteRequest, error) { + wreqs := make(map[string]*prompb.WriteRequest) + for i := range wreq.Timeseries { + endpoint, err := h.hashring.Get(tenant, &wreq.Timeseries[i]) + if err != nil { + return nil, err + } + if _, ok := wreqs[endpoint]; !ok { + wreqs[endpoint] = &prompb.WriteRequest{} + } + wr := wreqs[endpoint] + wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) + } + + ec := make(chan error) + defer close(ec) + var n int + var local *prompb.WriteRequest + for endpoint := range wreqs { + if endpoint == h.options.Endpoint { + local = wreqs[endpoint] + continue + } + n++ + go func(endpoint string) { + buf, err := proto.Marshal(wreqs[endpoint]) + if err != nil { + level.Error(h.logger).Log("msg", "proto marshal error", "err", err, "endpoint", endpoint) + ec <- err + return + } + req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(snappy.Encode(nil, buf))) + if err != nil { + level.Error(h.logger).Log("msg", "create request error", "err", err, "endpoint", endpoint) + ec <- err + return + } + req.Header.Add(tenantHeader, tenant) + if _, err := http.DefaultClient.Do(req.WithContext(ctx)); err != nil { + level.Error(h.logger).Log("msg", "forward request error", "err", err, "endpoint", endpoint) + ec <- err + return + } + ec <- nil + }(endpoint) + } + + var errs error + for ; n > 0; n-- { + if err := <-ec; err != nil { + if errs == nil { + errs = err + continue + } + errs = errors.Wrap(errs, err.Error()) + } + } + + return local, errs } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 11ab56040d3..2ed25fed20e 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -13,6 +13,10 @@ import ( const sep = '\xff' +// tenantLabel is used to group discovered targets by tenant. +const tenantLabel = "tenant" + +// Hashring finds the correct node to handle a given time series // for a specified tenant. // It returns the node and any error encountered. type Hashring interface { @@ -65,7 +69,15 @@ func hash(tenant string, ts *prompb.TimeSeries) uint64 { return xxhash.Sum64(b) } -// simpleHashring represents a group of hosts handling write requests. +// SingleNodeHashring always returns the same node. +type SingleNodeHashring string + +// Get implements the Hashring interface. +func (s SingleNodeHashring) Get(_ string, _ *prompb.TimeSeries) (string, error) { + return string(s), nil +} + +// simpleHashring represents a group of nodes handling write requests. type simpleHashring struct { targetgroup.Group } @@ -107,7 +119,53 @@ func NewHashring(matcher Matcher, groups []*targetgroup.Group) Hashring { matcher: matcher, } for _, g := range groups { - m.hashrings[g.Source] = &simpleHashring{*g} + l, ok := g.Labels[tenantLabel] + if !ok { + l = "" + } + t := string(l) + if _, ok := m.hashrings[t]; !ok { + m.hashrings[t] = &simpleHashring{} + } + h := m.hashrings[t].(*simpleHashring) + h.Targets = append(h.Targets, g.Targets...) } return m } + +// HashringsAreEquivalent indicates whether the hashrings created from +// two sets of groups would be equivalent. +func HashringsAreEquivalent(a, b []*targetgroup.Group) bool { + buildGroups := func(groups []*targetgroup.Group) map[string]map[string]struct{} { + m := map[string]map[string]struct{}{} + for _, g := range groups { + l, ok := g.Labels[tenantLabel] + if !ok { + l = "" + } + t := string(l) + if _, ok := m[t]; !ok { + m[t] = map[string]struct{}{} + } + for i := range g.Targets { + m[t][string(g.Targets[i][model.AddressLabel])] = struct{}{} + } + } + return m + } + ga, gb := buildGroups(a), buildGroups(b) + compare := func(a, b map[string]map[string]struct{}) bool { + for t := range a { + if _, ok := b[t]; !ok { + return false + } + for h := range a[t] { + if _, ok := b[t][h]; !ok { + return false + } + } + } + return true + } + return compare(ga, gb) && compare(gb, ga) +} diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 9f5edddb8be..08626d81eef 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -78,7 +78,6 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node1", }, }, - Source: "", }, { Targets: []model.LabelSet{ @@ -86,7 +85,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node2", }, }, - Source: "tenant1", + Labels: model.LabelSet{tenantLabel: "tenant1"}, }, }, nodes: map[string]struct{}{"node2": struct{}{}}, @@ -101,7 +100,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node1", }, }, - Source: "tenant1", + Labels: model.LabelSet{tenantLabel: "tenant1"}, }, { Targets: []model.LabelSet{ @@ -109,7 +108,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node2", }, }, - Source: "tenant2", + Labels: model.LabelSet{tenantLabel: "tenant2"}, }, { Targets: []model.LabelSet{ @@ -117,7 +116,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node3", }, }, - Source: "tenant3", + Labels: model.LabelSet{tenantLabel: "tenant3"}, }, }, nodes: map[string]struct{}{"node1": struct{}{}}, @@ -132,7 +131,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node1", }, }, - Source: "tenant1", + Labels: model.LabelSet{tenantLabel: "tenant1"}, }, { Targets: []model.LabelSet{ @@ -140,7 +139,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node2", }, }, - Source: "tenant2", + Labels: model.LabelSet{tenantLabel: "tenant2"}, }, { Targets: []model.LabelSet{ @@ -148,7 +147,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node3", }, }, - Source: "tenant3", + Labels: model.LabelSet{tenantLabel: "tenant3"}, }, }, tenant: "tenant4", @@ -168,7 +167,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node3", }, }, - Source: "tenant1", + Labels: model.LabelSet{tenantLabel: "tenant1"}, }, { Targets: []model.LabelSet{ @@ -182,7 +181,6 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node6", }, }, - Source: "", }, }, nodes: map[string]struct{}{ @@ -207,7 +205,7 @@ func TestHashringGet(t *testing.T) { model.AddressLabel: "node3", }, }, - Source: "tenant1", + Labels: model.LabelSet{tenantLabel: "tenant1"}, }, { Targets: []model.LabelSet{ @@ -229,6 +227,35 @@ func TestHashringGet(t *testing.T) { "node6": struct{}{}, }, }, + { + name: "combine many groups", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{}, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + model.LabelSet{ + model.AddressLabel: "node2", + }, + model.LabelSet{ + model.AddressLabel: "node3", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + }, + nodes: map[string]struct{}{ + "node1": struct{}{}, + "node2": struct{}{}, + "node3": struct{}{}, + }, + tenant: "tenant1", + }, } { hs := NewHashring(ExactMatcher, tc.cfg) h, err := hs.Get(tc.tenant, ts) @@ -247,3 +274,163 @@ func TestHashringGet(t *testing.T) { } } } + +func TestHashringsAreEquivalent(t *testing.T) { + for _, tc := range []struct { + name string + a []*targetgroup.Group + b []*targetgroup.Group + out bool + }{ + { + name: "empty", + out: true, + }, + { + name: "simple", + a: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + }, + b: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + }, + out: true, + }, + { + name: "complex", + a: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + { + Targets: []model.LabelSet{}, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node2", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + }, + b: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node2", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + }, + out: true, + }, + { + name: "different nodes", + a: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node2", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + }, + b: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + }, + out: false, + }, + { + name: "extra tenant", + a: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node2", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + }, + b: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node1", + }, + }, + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "node2", + }, + }, + Labels: model.LabelSet{tenantLabel: "tenant1"}, + }, + { + Labels: model.LabelSet{tenantLabel: "tenant2"}, + }, + }, + out: false, + }, + } { + if tc.out != HashringsAreEquivalent(tc.a, tc.b) { + t.Errorf("case %q: expected %t", tc.name, tc.out) + } + } +}