From b72edacb330721b9b5d2246baa67b518dbf721df Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 10:31:41 -0700 Subject: [PATCH 01/10] support series relabeling on Thanos receiver Signed-off-by: Ben Ye --- cmd/thanos/receive.go | 16 +- pkg/receive/handler.go | 26 +++ pkg/receive/handler_test.go | 371 +++++++++++++++++++++++++++++++++ test/e2e/e2ethanos/services.go | 14 ++ test/e2e/receive_test.go | 49 +++++ 5 files changed, 475 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5d60a19ba1..f1cc29f4dc 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -5,6 +5,8 @@ package main import ( "context" + "github.com/prometheus/prometheus/model/relabel" + "gopkg.in/yaml.v2" "io/ioutil" "os" "path" @@ -173,6 +175,15 @@ func runReceive( return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID) } + relabelContentYaml, err := conf.relabelConfigPath.Content() + if err != nil { + return errors.Wrap(err, "get content of relabel configuration") + } + var relabelConfig []*relabel.Config + if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { + return errors.Wrap(err, "parsing relabel configuration") + } + dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -729,7 +740,8 @@ type receiveConfig struct { ignoreBlockSize bool allowOutOfOrderUpload bool - reqLogConfig *extflag.PathOrContent + reqLogConfig *extflag.PathOrContent + relabelConfigPath *extflag.PathOrContent } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -783,6 +795,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) + rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabelling configuration.", extflag.WithEnvSubstitution()) + rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c35c3bea41..8b8e926c8a 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "google.golang.org/grpc" @@ -38,6 +39,7 @@ import ( extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/tracing" @@ -81,6 +83,7 @@ type Options struct { TLSConfig *tls.Config DialOpts []grpc.DialOption ForwardTimeout time.Duration + RelabelConfigs []*relabel.Config } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -353,6 +356,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } + // Apply relabeling configs. + h.relabel(&wreq) + if len(wreq.Timeseries) == 0 { + level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") + return + } + err = h.handleRequest(ctx, rep, tenant, &wreq) if err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) @@ -682,6 +692,22 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st } } +// relabel relabels the time series labels in the remote write request. +func (h *Handler) relabel(wreq *prompb.WriteRequest) { + if len(h.options.RelabelConfigs) > 0 { + timeSeries := make([]prompb.TimeSeries, 0, len(wreq.Timeseries)) + for _, ts := range wreq.Timeseries { + lbls := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), h.options.RelabelConfigs...) + if lbls == nil { + continue + } + ts.Labels = labelpb.ZLabelsFromPromLabels(lbls) + timeSeries = append(timeSeries, ts) + } + wreq.Timeseries = timeSeries + } +} + // isConflict returns whether or not the given error represents a conflict. func isConflict(err error) bool { if err == nil { diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 6c92fbabc8..9a6d649648 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -7,6 +7,8 @@ import ( "bytes" "context" "fmt" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" "io/ioutil" "math" "math/rand" @@ -1300,3 +1302,372 @@ func Heap(dir string) (err error) { defer runutil.CloseWithErrCapture(&err, f, "close") return pprof.WriteHeapProfile(f) } + +func TestRelabel(t *testing.T) { + for _, tcase := range []struct { + name string + relabel []*relabel.Config + writeRequest prompb.WriteRequest + expectedWriteRequest prompb.WriteRequest + }{ + { + name: "empty relabel configs", + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "has relabel configs but no relabelling applied", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"zoo"}, + TargetLabel: "bar", + Regex: relabel.MustNewRegexp("bar"), + Action: relabel.Replace, + Replacement: "baz", + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel rewrite existing labels", + relabel: []*relabel.Config{ + { + TargetLabel: "foo", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp(""), + Replacement: "test", + }, + { + TargetLabel: "__name__", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp(""), + Replacement: "foo", + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "foo", + }, + { + Name: "foo", + Value: "test", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel drops label", + relabel: []*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel drops time series", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"foo"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("bar"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{}, + }, + }, + { + name: "relabel rewrite existing exemplar series labels", + relabel: []*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: []labelpb.ZLabel{ + { + Name: "traceID", + Value: "foo", + }, + }, + Value: 1, + Timestamp: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: []labelpb.ZLabel{ + { + Name: "traceID", + Value: "foo", + }, + }, + Value: 1, + Timestamp: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel drops exemplars", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"foo"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("bar"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: []labelpb.ZLabel{ + { + Name: "traceID", + Value: "foo", + }, + }, + Value: 1, + Timestamp: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{}, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + h := NewHandler(nil, &Options{ + RelabelConfigs: tcase.relabel, + }) + + h.relabel(&tcase.writeRequest) + testutil.Equals(t, tcase.expectedWriteRequest, tcase.writeRequest) + }) + } +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 8260f392e7..587d9e05c3 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -365,6 +365,7 @@ type ReceiveBuilder struct { maxExemplars int ingestion bool hashringConfigs []receive.HashringConfig + relabelConfigs []*relabel.Config replication int image string } @@ -403,6 +404,11 @@ func (r *ReceiveBuilder) WithRouting(replication int, hashringConfigs ...receive return r } +func (r *ReceiveBuilder) WithRelabelConfigs(relabelConfigs []*relabel.Config) *ReceiveBuilder { + r.relabelConfigs = relabelConfigs + return r +} + // Init creates a Thanos Receive instance. // If ingestion is enabled it will be configured for ingesting samples. // If routing is configured (i.e. hashring configuration is provided) it routes samples to other receivers. @@ -448,6 +454,14 @@ func (r *ReceiveBuilder) Init() e2e.InstrumentedRunnable { args["--receive.replication-factor"] = strconv.Itoa(r.replication) } + if len(r.relabelConfigs) > 0 { + relabelConfigBytes, err := yaml.Marshal(r.relabelConfigs) + if err != nil { + return e2e.NewErrInstrumentedRunnable(r.Name(), errors.Wrapf(err, "generate relabel configs: %v", relabelConfigBytes)) + } + args["--receive.relabel-config"] = string(relabelConfigBytes) + } + return r.f.Init(wrapWithDefaults(e2e.StartOptions{ Image: r.image, Command: e2e.NewCommand("receive", e2e.BuildArgs(args)...), diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 9432dbeb28..2fc3b5205d 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -5,6 +5,7 @@ package e2e_test import ( "context" + "github.com/prometheus/prometheus/model/relabel" "log" "net/http" "net/http/httputil" @@ -558,4 +559,52 @@ func TestReceive(t *testing.T) { }, }) }) + + t.Run("relabel", func(t *testing.T) { + t.Parallel() + e, err := e2e.NewDockerEnvironment("e2e_receive_relabel") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // Setup Router Ingestor. + i := e2ethanos.NewReceiveBuilder(e, "ingestor"). + WithIngestionEnabled(). + WithRelabelConfigs([]*relabel.Config{ + { + TargetLabel: "job", + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("myself"), + }, + { + TargetLabel: "prometheus", + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("prom1"), + }, + }).Init() + + testutil.Ok(t, e2e.StartAndWaitReady(i)) + + // Setup Prometheus + prom := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(i.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, e2e.StartAndWaitReady(prom)) + + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) + + // We expect the data from each Prometheus instance to be replicated twice across our ingesting instances + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "receive": "receive-ingestor", + "replica": "0", + "tenant_id": "default-tenant", + }, + }) + }) } From 7cbd2e7becad2caa754e81cc0615c9425e2422d7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 10:47:57 -0700 Subject: [PATCH 02/10] add changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 4 ++-- docs/components/receive.md | 7 +++++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a66874971..17ffd8be3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache. +- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index f1cc29f4dc..1b5811748f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -5,8 +5,6 @@ package main import ( "context" - "github.com/prometheus/prometheus/model/relabel" - "gopkg.in/yaml.v2" "io/ioutil" "os" "path" @@ -24,7 +22,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/tsdb" + "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" diff --git a/docs/components/receive.md b/docs/components/receive.md index 435e1e15df..5a843cfbd2 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -140,6 +140,13 @@ Flags: configuration. If it's empty AND hashring configuration was provided, it means that receive will run in RoutingOnly mode. + --receive.relabel-config= + Alternative to 'receive.relabel-config-file' + flag (mutually exclusive). Content of YAML file + that contains relabelling configuration. + --receive.relabel-config-file= + Path to YAML file that contains relabelling + configuration. --receive.replica-header="THANOS-REPLICA" HTTP header specifying the replica number of a write request. From 54abfedc82d237fd69fee4f44fbcb5da1a0e8117 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 12:01:43 -0700 Subject: [PATCH 03/10] fix lint Signed-off-by: Ben Ye --- pkg/receive/handler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 9a6d649648..6bc0bcf8b0 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -7,8 +7,6 @@ import ( "bytes" "context" "fmt" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/relabel" "io/ioutil" "math" "math/rand" @@ -28,8 +26,10 @@ import ( "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "google.golang.org/grpc" From 2526c162970096db7d7cc184cbbaa7bb89b2fb7c Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 12:06:27 -0700 Subject: [PATCH 04/10] update lint Signed-off-by: Ben Ye --- test/e2e/receive_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 2fc3b5205d..13b3007833 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -5,7 +5,6 @@ package e2e_test import ( "context" - "github.com/prometheus/prometheus/model/relabel" "log" "net/http" "net/http/httputil" @@ -14,6 +13,8 @@ import ( "github.com/efficientgo/e2e" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/testutil" From 7a33da10683e84c1293f6113a28e5d0e96ccc54f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 13:32:32 -0700 Subject: [PATCH 05/10] fix e2e test Signed-off-by: Ben Ye --- test/e2e/receive_test.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 13b3007833..a2104aac67 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -572,14 +572,8 @@ func TestReceive(t *testing.T) { WithIngestionEnabled(). WithRelabelConfigs([]*relabel.Config{ { - TargetLabel: "job", - Action: relabel.LabelDrop, - Regex: relabel.MustNewRegexp("myself"), - }, - { - TargetLabel: "prometheus", - Action: relabel.LabelDrop, - Regex: relabel.MustNewRegexp("prom1"), + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("prometheus"), }, }).Init() @@ -602,6 +596,7 @@ func TestReceive(t *testing.T) { Deduplicate: false, }, []model.Metric{ { + "job": "myself", "receive": "receive-ingestor", "replica": "0", "tenant_id": "default-tenant", From fb00a3bebbea7f134425f4f72da7ada6ac008913 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 14:09:35 -0700 Subject: [PATCH 06/10] fix relabel config pass Signed-off-by: Ben Ye --- cmd/thanos/receive.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 1b5811748f..b580b69ae9 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -205,6 +205,7 @@ func runReceive( DefaultTenantID: conf.defaultTenantID, ReplicaHeader: conf.replicaHeader, ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, ReceiverMode: receiveMode, Tracer: tracer, TLSConfig: rwTLSConfig, From 65c3d267cc34f7cdb67a79e6767a0d75a84a604f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 May 2022 19:22:41 -0700 Subject: [PATCH 07/10] cleanup white space Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17ffd8be3b..70e335f1f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache. -- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. +- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. ### Changed From 64502debc81f08a44ba3a94ae021a43d2b0d7447 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 30 May 2022 10:53:35 -0700 Subject: [PATCH 08/10] address review comments Signed-off-by: Ben Ye --- cmd/thanos/receive.go | 4 ++-- docs/components/receive.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index b580b69ae9..6ed11a2835 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -181,7 +181,7 @@ func runReceive( } var relabelConfig []*relabel.Config if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { - return errors.Wrap(err, "parsing relabel configuration") + return errors.Wrap(err, "parse relabel configuration") } dbs := receive.NewMultiTSDB( @@ -796,7 +796,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) - rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabelling configuration.", extflag.WithEnvSubstitution()) + rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution()) rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) diff --git a/docs/components/receive.md b/docs/components/receive.md index 5a843cfbd2..16dfd8a42f 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -143,9 +143,9 @@ Flags: --receive.relabel-config= Alternative to 'receive.relabel-config-file' flag (mutually exclusive). Content of YAML file - that contains relabelling configuration. + that contains relabeling configuration. --receive.relabel-config-file= - Path to YAML file that contains relabelling + Path to YAML file that contains relabeling configuration. --receive.replica-header="THANOS-REPLICA" HTTP header specifying the replica number of a From 3dd7fb530bd5d6632594c8bca33e56b792dfebc4 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 30 May 2022 10:54:41 -0700 Subject: [PATCH 09/10] address comments Signed-off-by: Ben Ye --- pkg/receive/handler.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 8b8e926c8a..0cf8509ea5 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -694,18 +694,19 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st // relabel relabels the time series labels in the remote write request. func (h *Handler) relabel(wreq *prompb.WriteRequest) { - if len(h.options.RelabelConfigs) > 0 { - timeSeries := make([]prompb.TimeSeries, 0, len(wreq.Timeseries)) - for _, ts := range wreq.Timeseries { - lbls := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), h.options.RelabelConfigs...) - if lbls == nil { - continue - } - ts.Labels = labelpb.ZLabelsFromPromLabels(lbls) - timeSeries = append(timeSeries, ts) + if len(h.options.RelabelConfigs) == 0 { + return + } + timeSeries := make([]prompb.TimeSeries, 0, len(wreq.Timeseries)) + for _, ts := range wreq.Timeseries { + lbls := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), h.options.RelabelConfigs...) + if lbls == nil { + continue } - wreq.Timeseries = timeSeries + ts.Labels = labelpb.ZLabelsFromPromLabels(lbls) + timeSeries = append(timeSeries, ts) } + wreq.Timeseries = timeSeries } // isConflict returns whether or not the given error represents a conflict. From fd07785759abbaf6a9b3b82c294bd17960a3d3d2 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 30 May 2022 22:20:56 -0700 Subject: [PATCH 10/10] update comment Signed-off-by: Ben Ye --- test/e2e/receive_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index a2104aac67..53d1ebeeab 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -590,8 +590,7 @@ func TestReceive(t *testing.T) { t.Cleanup(cancel) testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) - - // We expect the data from each Prometheus instance to be replicated twice across our ingesting instances + // Label `prometheus` should be dropped. queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{