diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 64e8533eb4f..643650eea23 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -22,8 +22,7 @@ type testConfig struct { } var ( - firstPromPort = promHTTPPort(1) - remoteWriteEndpoint = fmt.Sprintf("http://%s/api/v1/receive", remoteWriteReceiveHTTP(1)) + firstPromPort = promHTTPPort(1) queryStaticFlagsSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0))). @@ -31,7 +30,7 @@ var ( Add(scraper(3, defaultPromConfig("prom-ha", 1))). Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). - Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint))) + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)))) queryFileSDSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0))). @@ -39,7 +38,7 @@ var ( Add(scraper(3, defaultPromConfig("prom-ha", 1))). Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). - Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint))) + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)))) ) func TestQuery(t *testing.T) { @@ -139,9 +138,10 @@ func testQuerySimple(t *testing.T, conf testConfig) { testutil.Equals(t, model.Metric{ "__name__": "up", - "instance": model.LabelValue("localhost:9100"), + "instance": model.LabelValue(nodeExporterHTTP(1)), "job": "node", "receive": "true", + "replica": model.LabelValue("1"), }, res[3].Metric) // Try query with deduplication. @@ -191,7 +191,7 @@ func testQuerySimple(t *testing.T, conf testConfig) { }, res[1].Metric) testutil.Equals(t, model.Metric{ "__name__": "up", - "instance": model.LabelValue("localhost:9100"), + "instance": model.LabelValue(nodeExporterHTTP(1)), "job": "node", "receive": "true", }, res[2].Metric) @@ -219,13 +219,13 @@ scrape_configs: `, name, replicas, firstPromPort) } -func defaultPromRemoteWriteConfig(remoteWriteEndpoint string) string { +func defaultPromRemoteWriteConfig(nodeExporterHTTP, remoteWriteEndpoint string) string { return fmt.Sprintf(` scrape_configs: - job_name: 'node' static_configs: - - targets: ['localhost:9100'] + - targets: ['%s'] remote_write: - url: "%s" -`, remoteWriteEndpoint) +`, nodeExporterHTTP, remoteWriteEndpoint) } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go new file mode 100644 index 00000000000..e9e4bbed001 --- /dev/null +++ b/test/e2e/receive_test.go @@ -0,0 +1,116 @@ +package e2e_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/promclient" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +var ( + receiveHashringSuite = newSpinupSuite(). + Add(querierWithStoreFlags(1, "replica", remoteWriteReceiveGRPC(1), remoteWriteReceiveGRPC(2), remoteWriteReceiveGRPC(3))). + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). + Add(receiver(2, defaultPromRemoteWriteConfig(nodeExporterHTTP(2), remoteWriteEndpoint(2)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). + Add(receiver(3, defaultPromRemoteWriteConfig(nodeExporterHTTP(3), remoteWriteEndpoint(3)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))) +) + +func TestReceive(t *testing.T) { + for _, tt := range []testConfig{ + { + "hashring", + receiveHashringSuite, + }, + } { + t.Run(tt.name, func(t *testing.T) { + testReceive(t, tt) + }) + } +} + +// testReceive runs a setup of Prometheus servers, receive nodes, and query nodes and verifies that +// queries return data from the Prometheus servers. Additionally it verifies that remote-writes were routed through the correct receive node. +func testReceive(t *testing.T, conf testConfig) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + + exit, err := conf.suite.Exec(t, ctx, conf.name) + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + var res model.Vector + + w := log.NewSyncWriter(os.Stderr) + l := log.NewLogfmtLogger(w) + l = log.With(l, "conf-name", conf.name) + + // Query without deduplication so we can check what replica the + // time series ended up on. + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + select { + case <-exit: + cancel() + return nil + default: + } + + var ( + err error + warnings []string + ) + res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "up", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) + if err != nil { + return err + } + + if len(warnings) > 0 { + // we don't expect warnings. + return errors.Errorf("unexpected warnings %s", warnings) + } + + expectedRes := 3 + if len(res) != expectedRes { + return errors.Errorf("unexpected result size %d, expected %d", len(res), expectedRes) + } + + return nil + })) + + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(1)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("2"), + }, res[0].Metric) + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(2)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("3"), + }, res[1].Metric) + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(3)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("1"), + }, res[2].Metric) +} diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 8782d812043..73a17fc2bce 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -30,6 +30,8 @@ var ( promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) } promRemoteWriteHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(100+i)) } + nodeExporterHTTP = func(i int) string { return fmt.Sprintf("localhost:%d", 9100+i) } + sidecarGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19090+i) } sidecarHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19190+i) } @@ -39,6 +41,7 @@ var ( rulerGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19790+i) } rulerHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19890+i) } + remoteWriteEndpoint = func(i int) string { return fmt.Sprintf("http://%s/api/v1/receive", remoteWriteReceiveHTTP(i)) } remoteWriteReceiveHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18690+i) } remoteWriteReceiveGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18790+i) } remoteWriteReceiveMetricHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18890+i) } @@ -122,7 +125,10 @@ func scraper(i int, config string) cmdScheduleFunc { } } -func receiver(i int, config string) cmdScheduleFunc { +func receiver(i int, config string, receiveAddresses ...string) cmdScheduleFunc { + if len(receiveAddresses) == 0 { + receiveAddresses = []string{remoteWriteEndpoint(1)} + } return func(workDir string) ([]Exec, error) { promDir := fmt.Sprintf("%s/data/remote-write-prom%d", workDir, i) if err := os.MkdirAll(promDir, 0777); err != nil { @@ -140,37 +146,51 @@ func receiver(i int, config string) cmdScheduleFunc { "--log.level", "info", "--web.listen-address", promRemoteWriteHTTP(i), ))) + + receiveFileSDDir := fmt.Sprintf("%s/data/receiveFileSd%d", workDir, i) + if err := os.MkdirAll(receiveFileSDDir, 0777); err != nil { + return nil, errors.Wrap(err, "create receive dir failed") + } + + if err := ioutil.WriteFile(receiveFileSDDir+"/filesd.json", []byte(generateFileSD(receiveAddresses)), 0666); err != nil { + return nil, errors.Wrap(err, "creating receive config failed") + } + return append(cmds, newCmdExec(exec.Command("thanos", "receive", "--debug.name", fmt.Sprintf("remote-write-receive-%d", i), "--grpc-address", remoteWriteReceiveGRPC(i), "--http-address", remoteWriteReceiveMetricHTTP(i), "--remote-write.address", remoteWriteReceiveHTTP(i), "--labels", "receive=\"true\"", + "--labels", fmt.Sprintf(`replica="%d"`, i), "--tsdb.path", promDir, - "--log.level", "debug"))), nil + "--log.level", "debug", + "--receive.local-endpoint", remoteWriteEndpoint(i), + "--receive.sd-files", path.Join(receiveFileSDDir, "filesd.json"), + "--receive.sd-interval", "5s"))), nil } } -func querierWithStoreFlags(i int, replicaLabel string, storesAddresses ...string) cmdScheduleFunc { +func querierWithStoreFlags(i int, replicaLabel string, storeAddresses ...string) cmdScheduleFunc { return func(_ string) ([]Exec, error) { args := defaultQuerierFlags(i, replicaLabel) - for _, addr := range storesAddresses { + for _, addr := range storeAddresses { args = append(args, "--store", addr) } return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil } } -func querierWithFileSD(i int, replicaLabel string, storesAddresses ...string) cmdScheduleFunc { +func querierWithFileSD(i int, replicaLabel string, storeAddresses ...string) cmdScheduleFunc { return func(workDir string) ([]Exec, error) { queryFileSDDir := fmt.Sprintf("%s/data/queryFileSd%d", workDir, i) if err := os.MkdirAll(queryFileSDDir, 0777); err != nil { - return nil, errors.Wrap(err, "create prom dir failed") + return nil, errors.Wrap(err, "create query dir failed") } - if err := ioutil.WriteFile(queryFileSDDir+"/filesd.json", []byte(generateFileSD(storesAddresses)), 0666); err != nil { - return nil, errors.Wrap(err, "creating prom config failed") + if err := ioutil.WriteFile(queryFileSDDir+"/filesd.json", []byte(generateFileSD(storeAddresses)), 0666); err != nil { + return nil, errors.Wrap(err, "creating query SD config failed") } args := append(