From 1dda980fc0e51a58a995422bcacefc3d9dd57740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Thu, 13 Jun 2019 17:18:44 +0200 Subject: [PATCH] pkg/receive: forward metrics This commit enables metrics forwarding from one receive node to another. The receive nodes construct hashrings from the given sd-files and use these hashrings to select a node to which toforward a given time series. Time series are batched together to ensure that for any incoming write-request to a node, at most one outgoing write-request will be made every other node in the hashring. --- cmd/thanos/receive.go | 93 +++++++++++++++- go.mod | 1 + go.sum | 31 ++++++ pkg/receive/config.go | 206 +++++++++++++++++++++++++++++++++++ pkg/receive/handler.go | 143 +++++++++++++++++++++--- pkg/receive/hashring.go | 107 ++++++++++++++---- pkg/receive/hashring_test.go | 146 ++++++------------------- 7 files changed, 576 insertions(+), 151 deletions(-) create mode 100644 pkg/receive/config.go diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index b7aa8e8c3cc..e0562a1d6b7 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "net" + "os" + "strings" "time" "github.com/go-kit/kit/log" @@ -45,12 +47,42 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri retention := modelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention").Default("15d")) + hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration."). + PlaceHolder("").String() + + refreshInterval := modelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). + Default("5m")) + + local := cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration.").String() + + tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default("THANOS-TENANT").String() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { return errors.Wrap(err, "parse labels") } + var cw *receive.ConfigWatcher + if *hashringsFile != "" { + cw, err = receive.NewConfigWatcher(*hashringsFile, *refreshInterval, logger) + if err != nil { + return err + } + } + + // Local is empty, so try to generate a local endpoint + // based on the hostname and the listening port. + if *local == "" { + hostname, err := os.Hostname() + if hostname == "" || err != nil { + return errors.New("--receive.local-endpoint is empty and host could not be determined.") + } + parts := strings.Split(*remoteWriteAddress, ":") + port := parts[len(parts)-1] + *local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port) + } + return runReceive( g, logger, @@ -66,6 +98,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig, lset, *retention, + cw, + *local, + *tenantHeader, ) } } @@ -85,6 +120,9 @@ func runReceive( objStoreConfig *pathOrContent, lset labels.Labels, retention model.Duration, + cw *receive.ConfigWatcher, + endpoint string, + tenantHeader string, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") @@ -103,6 +141,8 @@ func runReceive( ListenAddress: remoteWriteAddress, Registry: reg, ReadyStorage: localStorage, + Endpoint: endpoint, + TenantHeader: tenantHeader, }) // Start all components while we wait for TSDB to open but only load @@ -129,7 +169,7 @@ func runReceive( startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000) localStorage.Set(db, startTimeMargin) - webHandler.Ready() + webHandler.StorageReady() level.Info(logger).Log("msg", "server is ready to receive web requests.") close(dbOpen) <-cancel @@ -144,6 +184,57 @@ func runReceive( ) } + level.Debug(logger).Log("msg", "setting up hashring") + { + updates := make(chan receive.Hashring) + if cw != nil { + cw.RegisterMetrics(reg) + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + receive.HashringFromConfig(ctx, updates, receive.ExactMatcher, cw) + return nil + }, func(error) { + cancel() + close(updates) + }) + } else { + cancel := make(chan struct{}) + g.Add(func() error { + updates <- receive.SingleNodeHashring(endpoint) + <-cancel + return nil + }, func(error) { + close(cancel) + close(updates) + }) + } + + cancel := make(chan struct{}) + g.Add( + func() error { + select { + case h := <-updates: + webHandler.Hashring(h) + case <-cancel: + return nil + } + select { + // If any new hashring is received, then mark the handler as unready, but keep it alive. + case <-updates: + webHandler.Hashring(nil) + level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests.") + case <-cancel: + return nil + } + <-cancel + return nil + }, + func(err error) { + close(cancel) + }, + ) + } + level.Debug(logger).Log("msg", "setting up metric http listen-group") if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil { return err diff --git a/go.mod b/go.mod index 6f7176bb8ce..83a620f1f6f 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( google.golang.org/api v0.3.2 google.golang.org/grpc v1.19.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 + gopkg.in/fsnotify.v1 v1.4.7 gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index bbd83727301..fcb04460274 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,15 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +contrib.go.opencensus.io/exporter/ocagent v0.4.12 h1:jGFvw3l57ViIVEPKKEUXPcLYIXJmQxLUh6ey1eJhwyc= contrib.go.opencensus.io/exporter/ocagent v0.4.12/go.mod h1:450APlNTSR6FrvC3CTRqYosuDstRB9un7SOx2k/9ckA= github.com/Azure/azure-pipeline-go v0.1.8 h1:KmVRa8oFMaargVesEuuEoiLCQ4zCCwQ8QX/xg++KS20= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= +github.com/Azure/azure-sdk-for-go v23.2.0+incompatible h1:bch1RS060vGpHpY3zvQDV4rOiRw25J1zmR/B9a76aSA= github.com/Azure/azure-sdk-for-go v23.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c h1:Y5ueznoCekgCWBytF1Q9lTpZ3tJeX37dQtCcGjMCLYI= github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= +github.com/Azure/go-autorest v11.2.8+incompatible h1:Q2feRPMlcfVcqz3pF87PJzkm5lZrL+x6BDtzhODzNJM= github.com/Azure/go-autorest v11.2.8+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= @@ -28,12 +31,14 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/aws/aws-sdk-go v0.0.0-20180507225419-00862f899353 h1:qFKf58XUUvHaEz0zFkLJsQ4dzoAyrQ8QyhK4nHGHBI4= github.com/aws/aws-sdk-go v0.0.0-20180507225419-00862f899353/go.mod h1:ZRmQr0FajVIyZ4ZzBYKG5P3ZqPz9IHG41ZoMu1ADI3k= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/biogo/store v0.0.0-20160505134755-913427a1d5e8/go.mod h1:Iev9Q3MErcn+w3UOJD/DkEzllvugfdx7bGcMOFhvr/4= github.com/cenk/backoff v2.0.0+incompatible/go.mod h1:7FtoeaSnHoZnmZzz47cM35Y9nSW7tNyaidugnHTaFDE= +github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4= github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/certifi/gocertifi v0.0.0-20180905225744-ee1a9a0726d2/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -49,6 +54,7 @@ github.com/coreos/etcd v3.3.12+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v0.0.0-20161101193935-9ed569b5d1ac h1:xrQJVwQCGqDvOO7/0+RyIq5J2M3Q4ZF7Ug/BMQtML1E= github.com/dgrijalva/jwt-go v0.0.0-20161101193935-9ed569b5d1ac/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -69,6 +75,7 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/gernest/wow v0.1.0/go.mod h1:dEPabJRi5BneI1Nev1VWo0ZlcTWibHWp43qxKms4elY= github.com/getsentry/raven-go v0.1.2/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-ini/ini v1.21.1 h1:+QXUYsI7Tfxc64oD6R5BxU/Aq+UwGkyjH4W/hMNG7bg= github.com/go-ini/ini v1.21.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -93,17 +100,20 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v0.0.0-20150304233714-bbcb9da2d746 h1:M6d2zDTA4cKXT6OwFsJxlo5tWrAukj3KfvJ1zcBatnA= github.com/google/gofuzz v0.0.0-20150304233714-bbcb9da2d746/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20180605153948-8b03ce837f34/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww= github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= +github.com/googleapis/gnostic v0.0.0-20180520015035-48a0ecefe2e4 h1:yxHFSapGMUoyn+3v6LiJJxoJhvbDqIq8me0gAWehnSU= github.com/googleapis/gnostic v0.0.0-20180520015035-48a0ecefe2e4/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/gophercloud/gophercloud v0.0.0-20190301152420-fca40860790e h1:hQpY0g0UGsLKLDs8UJ6xpA2gNCkEdEbvxSPqLItXCpI= github.com/gophercloud/gophercloud v0.0.0-20190301152420-fca40860790e/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= @@ -112,20 +122,25 @@ github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e h1:JKmoR8x90Iww1 github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117 h1:v9uUYPE4RHQHA0C9XfpAX9uzWQvgIDYjPh6m/mQgrzs= github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE= github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= +github.com/hashicorp/consul v1.4.4 h1:DR1+5EGgnPsd/LIsK3c9RDvajcsV5GOkGQBSNd3dpn8= github.com/hashicorp/consul v1.4.4/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90 h1:VBj0QYQ0u2MCJzBfeYXGexnAl17GsH1yidnoxCqqD9E= github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90/go.mod h1:o4zcYY1e0GEZI6eSEr+43QDYmuGglw1qSO6qdHUHCgg= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A= @@ -140,12 +155,15 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= +github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/influxdata/influxdb v0.0.0-20170331210902-15e594fc09f1/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgx v3.2.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7 h1:SMvOWPJCES2GdFracYbBQh93GXac8fq7HeN6JnpduB8= github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3 h1:/UewZcckqhvnnS0C6r3Sher2hSEbVmM6Ogpcjen08+Y= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= @@ -192,9 +210,12 @@ github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUb github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mozillazg/go-cos v0.12.0 h1:b9hUd5HjrDe10BUfkyiLYI1+z4M2kAgKasktszx9pO4= @@ -224,6 +245,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/peterbourgon/diskv v0.0.0-20180312054125-0646ccaebea1 h1:k/dnb0bixQwWsDLxwr6/w7rtZCVDKdbQnGQkeZGYsws= github.com/peterbourgon/diskv v0.0.0-20180312054125-0646ccaebea1/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg= github.com/petermattis/goid v0.0.0-20170504144140-0ded85884ba5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= @@ -259,6 +281,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 h1:4AQBn5RJY4WH8t8TLEMZUsWeXHAUcoao42TCAfpEJJE= github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= @@ -344,6 +367,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20170424234030-8be79e1e0910 h1:bCMaBn7ph495H+x72gEvgcv+mDRd9dElbzo/mVCMxX4= golang.org/x/time v0.0.0-20170424234030-8be79e1e0910/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -370,10 +394,12 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify/fsnotify.v1 v1.3.1/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo= gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= @@ -386,9 +412,14 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/api v0.0.0-20181213150558-05914d821849 h1:WZFcFPXmLR7g5CxQNmjWv0mg8qulJLxDghbzS4pQtzY= k8s.io/api v0.0.0-20181213150558-05914d821849/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apimachinery v0.0.0-20181127025237-2b1284ed4c93 h1:tT6oQBi0qwLbbZSfDkdIsb23EwaLY85hoAV4SpXfdao= k8s.io/apimachinery v0.0.0-20181127025237-2b1284ed4c93/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= +k8s.io/client-go v2.0.0-alpha.0.0.20181121191925-a47917edff34+incompatible h1:7JnS1I1KbtbearjSCrycUhHSob+KjG6HDWY1GhjkAIU= k8s.io/client-go v2.0.0-alpha.0.0.20181121191925-a47917edff34+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= +k8s.io/klog v0.1.0 h1:I5HMfc/DtuVaGR1KPwUrTc476K8NCqNBldC7H4dYEzk= k8s.io/klog v0.1.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/kube-openapi v0.0.0-20180629012420-d83b052f768a/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= +sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/pkg/receive/config.go b/pkg/receive/config.go new file mode 100644 index 00000000000..e7a47f3b9da --- /dev/null +++ b/pkg/receive/config.go @@ -0,0 +1,206 @@ +package receive + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "reflect" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "gopkg.in/fsnotify.v1" +) + +// HashringConfig represents the configuration for a hashring +// a receive node knows about. +type HashringConfig struct { + Hashring string `json:"hashring"` + Tenants []string `json:"tenants"` + Endpoints []string `json:"endpoints"` +} + +// ConfigWatcher is able to watch a file containing a hashring configuration +// for updates. +type ConfigWatcher struct { + path string + interval time.Duration + logger log.Logger + watcher *fsnotify.Watcher + + changesCounter prometheus.Counter + errorCounter prometheus.Counter + refreshCounter prometheus.Counter + + // last is the last known configuration. + last []HashringConfig +} + +// NewConfigWatcher creates a new ConfigWatcher. +func NewConfigWatcher(path string, interval model.Duration, logger log.Logger) (*ConfigWatcher, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, errors.Wrap(err, "creating file watcher") + } + if err := watcher.Add(path); err != nil { + return nil, errors.Wrap(err, "adding path to file watcher") + } + return &ConfigWatcher{ + path: path, + interval: time.Duration(interval), + logger: logger, + watcher: watcher, + changesCounter: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_hashrings_file_changes_total", + Help: "The number of times the hashrings configuration file has changed.", + }), + errorCounter: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_hashrings_file_errors_total", + Help: "The number of errors watching the hashrings configuration file.", + }), + refreshCounter: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_hashrings_file_refreshes_total", + Help: "The number of refreshes of the hashrings configuration file.", + }), + }, nil +} + +// RegisterMetrics registers the configuration watcher's metrics +// with the given Prometheus registerer. +func (cw *ConfigWatcher) RegisterMetrics(r prometheus.Registerer) { + r.MustRegister( + cw.changesCounter, + cw.errorCounter, + cw.refreshCounter, + ) +} + +// Run starts the ConfigWatcher and sends all updates on the specified channel +// until the given context is cancelled. +func (cw *ConfigWatcher) Run(ctx context.Context, ch chan<- []HashringConfig) { + defer cw.stop() + + cw.refresh(ctx, ch) + + ticker := time.NewTicker(cw.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case event := <-cw.watcher.Events: + // fsnotify sometimes sends a bunch of events without name or operation. + // It's unclear what they are and why they are sent - filter them out. + if len(event.Name) == 0 { + break + } + // Everything but a chmod requires rereading. + if event.Op^fsnotify.Chmod == 0 { + break + } + // Changes to a file can spawn various sequences of events with + // different combinations of operations. For all practical purposes + // this is inaccurate. + // The most reliable solution is to reload everything if anything happens. + cw.refresh(ctx, ch) + + case <-ticker.C: + // Setting a new watch after an update might fail. Make sure we don't lose + // those files forever. + cw.refresh(ctx, ch) + + case err := <-cw.watcher.Errors: + if err != nil { + cw.errorCounter.Inc() + level.Error(cw.logger).Log("msg", "error watching file", "err", err) + } + } + } +} + +// readFile reads the configured file and returns a configuration. +func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) { + fd, err := os.Open(cw.path) + if err != nil { + return nil, err + } + defer func() { + if err := fd.Close(); err != nil { + level.Error(cw.logger).Log("msg", "failed to close file", "err", err, "path", cw.path) + } + }() + + content, err := ioutil.ReadAll(fd) + if err != nil { + return nil, err + } + + var config []HashringConfig + json.Unmarshal(content, &config) + return config, err +} + +// refresh reads the configured file and sends the hashring configuration on the channel. +func (cw *ConfigWatcher) refresh(ctx context.Context, ch chan<- []HashringConfig) { + cw.refreshCounter.Inc() + config, err := cw.readFile() + if err != nil { + cw.errorCounter.Inc() + level.Error(cw.logger).Log("msg", "failed to read configuration file", "err", err, "path", cw.path) + return + } + + // If there was no change to the configuration, return early. + if reflect.DeepEqual(cw.last, config) { + return + } + cw.changesCounter.Inc() + // Save the last known configuration. + cw.last = config + + select { + case <-ctx.Done(): + return + case ch <- config: + return + } +} + +// stop shuts down the config watcher. +func (cw *ConfigWatcher) stop() { + level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path) + + done := make(chan struct{}) + defer close(done) + + // Closing the watcher will deadlock unless all events and errors are drained. + go func() { + for { + select { + case <-cw.watcher.Errors: + case <-cw.watcher.Events: + // Drain all events and errors. + case <-done: + return + } + } + }() + if err := cw.watcher.Close(); err != nil { + level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err) + } + + level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped") +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c96cde5d763..a2ddea29a38 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1,6 +1,8 @@ package receive import ( + "bytes" + "context" "fmt" "io/ioutil" stdlog "log" @@ -17,6 +19,7 @@ import ( conntrack "github.com/mwitkow/go-conntrack" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/route" @@ -48,6 +51,8 @@ type Options struct { ListenAddress string Registry prometheus.Registerer ReadyStorage *promtsdb.ReadyStorage + Endpoint string + TenantHeader string } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -56,10 +61,13 @@ type Handler struct { logger log.Logger receiver *Writer router *route.Router + hashring Hashring options *Options listener net.Listener - ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions. + // These fields are uint32 rather than boolean to be able to use atomic functions. + storageReady uint32 + hashringReady uint32 } func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { @@ -92,15 +100,28 @@ func NewHandler(logger log.Logger, o *Options) *Handler { return h } -// Ready sets Handler to be ready. -func (h *Handler) Ready() { - atomic.StoreUint32(&h.ready, 1) +// StorageReady marks the storage as ready. +func (h *Handler) StorageReady() { + atomic.StoreUint32(&h.storageReady, 1) +} + +// Hashring sets the hashring for the handler and marks the hashring as ready. +// If the hashring is nil, then the hashring is marked as not ready. +func (h *Handler) Hashring(hashring Hashring) { + if hashring == nil { + atomic.StoreUint32(&h.hashringReady, 0) + h.hashring = nil + return + } + h.hashring = hashring + atomic.StoreUint32(&h.hashringReady, 1) } // Verifies whether the server is ready or not. func (h *Handler) isReady() bool { - ready := atomic.LoadUint32(&h.ready) - return ready > 0 + sr := atomic.LoadUint32(&h.storageReady) + hr := atomic.LoadUint32(&h.hashringReady) + return sr > 0 && hr > 0 } // Checks if server is ready, calls f if it is, returns 503 if it is not. @@ -124,11 +145,6 @@ func (h *Handler) Close() { runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") } -// Checks if server is ready, calls f if it is, returns 503 if it is not. -func (h *Handler) testReadyHandler(f http.Handler) http.HandlerFunc { - return h.testReady(f.ServeHTTP) -} - // Run serves the HTTP endpoints. func (h *Handler) Run() error { level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) @@ -160,8 +176,8 @@ func (h *Handler) Run() error { return httpSrv.Serve(h.listener) } -func (h *Handler) receive(w http.ResponseWriter, req *http.Request) { - compressed, err := ioutil.ReadAll(req.Body) +func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -180,8 +196,107 @@ func (h *Handler) receive(w http.ResponseWriter, req *http.Request) { return } - if err := h.receiver.Receive(&wreq); err != nil { + tenant := r.Header.Get(h.options.TenantHeader) + local, err := h.forward(r.Context(), tenant, &wreq) + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + // There may be no WriteRequest destined for the local node. + if local != nil { + if err := h.receiver.Receive(local); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } +} + +// forward accepts a write request, batches its time series by +// corresponding endpoint, and forwards them in parallel. It returns a write +// request containing only the time series that correspond to +// local handler. For a given write request, at most one outgoing +// write request will be made to every other node in the hashring. +// The function only returns when all requests have finished, +// or the context is canceled. +func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.WriteRequest) (*prompb.WriteRequest, error) { + wreqs := make(map[string]*prompb.WriteRequest) + // Batch all of the time series in the write request + // into several smaller write requests that are + // grouped by target endpoint. This ensures that + // for any incoming write request to a node, + // at most one outgoing write request will be made + // to every other node in the hashring, rather than + // one request per time series. + for i := range wreq.Timeseries { + endpoint, err := h.hashring.Get(tenant, &wreq.Timeseries[i]) + if err != nil { + return nil, err + } + if _, ok := wreqs[endpoint]; !ok { + wreqs[endpoint] = &prompb.WriteRequest{} + } + wr := wreqs[endpoint] + wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) + } + + ec := make(chan error) + defer close(ec) + // We don't wan't to use a sync.WaitGroup here because that + // introduces an unnecessary second synchronization mechanism, + // the first being the error chan. Plus, it saves us a goroutine + // as in order to collect errors while doing wg.Wait, we would + // need a separate error collection goroutine. + var n int + var local *prompb.WriteRequest + for endpoint := range wreqs { + // If the endpoint for the write request is the + // local node, then don't make a request. + // Save it for later so it can be returned. + if endpoint == h.options.Endpoint { + local = wreqs[endpoint] + continue + } + n++ + go func(endpoint string) { + buf, err := proto.Marshal(wreqs[endpoint]) + if err != nil { + level.Error(h.logger).Log("msg", "proto marshal error", "err", err, "endpoint", endpoint) + ec <- err + return + } + req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(snappy.Encode(nil, buf))) + if err != nil { + level.Error(h.logger).Log("msg", "create request error", "err", err, "endpoint", endpoint) + ec <- err + return + } + req.Header.Add(h.options.TenantHeader, tenant) + // Actually make the request against the endpoint + // we determined should handle these time series. + if _, err := http.DefaultClient.Do(req.WithContext(ctx)); err != nil { + level.Error(h.logger).Log("msg", "forward request error", "err", err, "endpoint", endpoint) + ec <- err + return + } + ec <- nil + }(endpoint) + } + + // Collect any errors from forwarding the time series. + // Rather than doing a wg.Wait here, we decrement a counter + // for every error received on the chan. This simplifies + // error collection and avoids data races with a separate + // error collection goroutine. + var errs error + for ; n > 0; n-- { + if err := <-ec; err != nil { + if errs == nil { + errs = err + continue + } + errs = errors.Wrap(errs, err.Error()) + } + } + + return local, errs } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 11ab56040d3..54fbaff9bfe 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -1,18 +1,19 @@ package receive import ( + "context" "errors" "sort" + "sync" "github.com/improbable-eng/thanos/pkg/store/prompb" "github.com/cespare/xxhash" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/discovery/targetgroup" ) const sep = '\xff' +// Hashring finds the correct node to handle a given time series // for a specified tenant. // It returns the node and any error encountered. type Hashring interface { @@ -65,49 +66,109 @@ func hash(tenant string, ts *prompb.TimeSeries) uint64 { return xxhash.Sum64(b) } -// simpleHashring represents a group of hosts handling write requests. -type simpleHashring struct { - targetgroup.Group +// SingleNodeHashring always returns the same node. +type SingleNodeHashring string + +// Get implements the Hashring interface. +func (s SingleNodeHashring) Get(_ string, _ *prompb.TimeSeries) (string, error) { + return string(s), nil } +// simpleHashring represents a group of nodes handling write requests. +type simpleHashring []string + // Get returns a target to handle the given tenant and time series. -func (s *simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { +func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { // Always return nil here to implement the Hashring interface. - return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil + return s[hash(tenant, ts)%uint64(len(s))], nil } // matchingHashring represents a set of hashrings. // Which hashring to use is determined by the matcher. type matchingHashring struct { cache map[string]Hashring - hashrings map[string]Hashring + hashrings []Hashring matcher Matcher + cfg []HashringConfig + + // We need a mutex to guard concurrent access + // to the cache map, as this is both written to + // and read from. + mu sync.RWMutex } // Get returns a target to handle the given tenant and time series. -func (m matchingHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { - if h, ok := m.cache[tenant]; ok { +func (m *matchingHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { + m.mu.RLock() + h, ok := m.cache[tenant] + m.mu.RUnlock() + if ok { return h.Get(tenant, ts) } - for name := range m.hashrings { - if m.matcher.Match(tenant, name) { - m.cache[tenant] = m.hashrings[name] - return m.hashrings[name].Get(tenant, ts) + var found bool + // If the tenant is not in the cache, then we need to check + // every tenant in the configuration. + for i, h := range m.cfg { + // If the hashring has no tenants, then it is + // considered a default hashring and matches everything. + if len(h.Tenants) == 0 { + found = true + } + // Match against every tenant in the hashring. + for _, t := range h.Tenants { + if m.matcher.Match(tenant, t) { + found = true + break + } + } + // If the hashring has no tenants, then it is + // considered a default hashring and matches everything. + if found { + m.mu.Lock() + m.cache[tenant] = m.hashrings[i] + m.mu.Unlock() + return m.hashrings[i].Get(tenant, ts) } } return "", errors.New("no matching hashring to handle tenant") } -// NewHashring creates a multi-tenant hashring for a given slice of -// groups. Which tenant's hashring to use is determined by the Matcher. -func NewHashring(matcher Matcher, groups []*targetgroup.Group) Hashring { - m := matchingHashring{ - cache: make(map[string]Hashring), - hashrings: make(map[string]Hashring), - matcher: matcher, +// newMatchingHashring creates a multi-tenant hashring for a given slice of +// groups. +// Which nodes are assigned to a hashring is controlled +// by the given tenant label. +// Which tenant's hashring to use is determined by the Matcher. +func newMatchingHashring(matcher Matcher, cfg []HashringConfig) Hashring { + m := &matchingHashring{ + cache: make(map[string]Hashring), + matcher: matcher, + cfg: cfg, } - for _, g := range groups { - m.hashrings[g.Source] = &simpleHashring{*g} + + for _, h := range cfg { + m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints)) } return m } + +// HashringFromConfig creates multi-tenant hashrings from a +// hashring configuration file watcher. +// The configuration file is watched for updates. +// Hashrings are returned on the updates channel. +// Which nodes are assigned to a hashring is controlled +// by the given tenant label. +// Which tenant's hashring to use is determined by the Matcher. +func HashringFromConfig(ctx context.Context, updates chan<- Hashring, matcher Matcher, cw *ConfigWatcher) { + cfgUpdates := make(chan []HashringConfig) + defer close(cfgUpdates) + go cw.Run(ctx, cfgUpdates) + + for { + select { + case cfg := <-cfgUpdates: + updates <- newMatchingHashring(matcher, cfg) + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 9f5edddb8be..a3d53031982 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -4,8 +4,6 @@ import ( "testing" "github.com/improbable-eng/thanos/pkg/store/prompb" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/discovery/targetgroup" ) func TestHash(t *testing.T) { @@ -47,77 +45,52 @@ func TestHashringGet(t *testing.T) { for _, tc := range []struct { name string - cfg []*targetgroup.Group + cfg []HashringConfig nodes map[string]struct{} tenant string }{ { name: "empty", - cfg: []*targetgroup.Group{}, + cfg: nil, tenant: "tenant1", }, { name: "simple", - cfg: []*targetgroup.Group{ + cfg: []HashringConfig{ { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node1", - }, - }, + Endpoints: []string{"node1"}, }, }, nodes: map[string]struct{}{"node1": struct{}{}}, }, { name: "specific", - cfg: []*targetgroup.Group{ + cfg: []HashringConfig{ { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node1", - }, - }, - Source: "", + Endpoints: []string{"node2"}, + Tenants: []string{"tenant2"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node2", - }, - }, - Source: "tenant1", + Endpoints: []string{"node1"}, }, }, nodes: map[string]struct{}{"node2": struct{}{}}, - tenant: "tenant1", + tenant: "tenant2", }, { name: "many tenants", - cfg: []*targetgroup.Group{ + cfg: []HashringConfig{ { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node1", - }, - }, - Source: "tenant1", + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node2", - }, - }, - Source: "tenant2", + Endpoints: []string{"node2"}, + Tenants: []string{"tenant2"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node3", - }, - }, - Source: "tenant3", + Endpoints: []string{"node3"}, + Tenants: []string{"tenant3"}, }, }, nodes: map[string]struct{}{"node1": struct{}{}}, @@ -125,64 +98,31 @@ func TestHashringGet(t *testing.T) { }, { name: "many tenants error", - cfg: []*targetgroup.Group{ + cfg: []HashringConfig{ { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node1", - }, - }, - Source: "tenant1", + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node2", - }, - }, - Source: "tenant2", + Endpoints: []string{"node2"}, + Tenants: []string{"tenant2"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node3", - }, - }, - Source: "tenant3", + Endpoints: []string{"node3"}, + Tenants: []string{"tenant3"}, }, }, tenant: "tenant4", }, { name: "many nodes", - cfg: []*targetgroup.Group{ + cfg: []HashringConfig{ { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node1", - }, - model.LabelSet{ - model.AddressLabel: "node2", - }, - model.LabelSet{ - model.AddressLabel: "node3", - }, - }, - Source: "tenant1", + Endpoints: []string{"node1", "node2", "node3"}, + Tenants: []string{"tenant1"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node4", - }, - model.LabelSet{ - model.AddressLabel: "node5", - }, - model.LabelSet{ - model.AddressLabel: "node6", - }, - }, - Source: "", + Endpoints: []string{"node4", "node5", "node6"}, }, }, nodes: map[string]struct{}{ @@ -193,34 +133,14 @@ func TestHashringGet(t *testing.T) { tenant: "tenant1", }, { - name: "many nodes 2", - cfg: []*targetgroup.Group{ + name: "many nodes default", + cfg: []HashringConfig{ { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node1", - }, - model.LabelSet{ - model.AddressLabel: "node2", - }, - model.LabelSet{ - model.AddressLabel: "node3", - }, - }, - Source: "tenant1", + Endpoints: []string{"node1", "node2", "node3"}, + Tenants: []string{"tenant1"}, }, { - Targets: []model.LabelSet{ - model.LabelSet{ - model.AddressLabel: "node4", - }, - model.LabelSet{ - model.AddressLabel: "node5", - }, - model.LabelSet{ - model.AddressLabel: "node6", - }, - }, + Endpoints: []string{"node4", "node5", "node6"}, }, }, nodes: map[string]struct{}{ @@ -230,7 +150,7 @@ func TestHashringGet(t *testing.T) { }, }, } { - hs := NewHashring(ExactMatcher, tc.cfg) + hs := newMatchingHashring(ExactMatcher, tc.cfg) h, err := hs.Get(tc.tenant, ts) if tc.nodes != nil { if err != nil {