Skip to content

Commit

Permalink
test/e2e: add receiver hashring test
Browse files Browse the repository at this point in the history
  • Loading branch information
squat committed Jun 17, 2019
1 parent c3d3fb0 commit 251a6da
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 17 deletions.
18 changes: 9 additions & 9 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,23 @@ type testConfig struct {
}

var (
firstPromPort = promHTTPPort(1)
remoteWriteEndpoint = fmt.Sprintf("http://%s/api/v1/receive", remoteWriteReceiveHTTP(1))
firstPromPort = promHTTPPort(1)

queryStaticFlagsSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0))).
Add(scraper(2, defaultPromConfig("prom-ha", 0))).
Add(scraper(3, defaultPromConfig("prom-ha", 1))).
Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))).
Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))).
Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint)))
Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1))))

queryFileSDSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0))).
Add(scraper(2, defaultPromConfig("prom-ha", 0))).
Add(scraper(3, defaultPromConfig("prom-ha", 1))).
Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))).
Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))).
Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint)))
Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1))))
)

func TestQuery(t *testing.T) {
Expand Down Expand Up @@ -139,9 +138,10 @@ func testQuerySimple(t *testing.T, conf testConfig) {

testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue("localhost:9100"),
"instance": model.LabelValue(nodeExporterHTTP(1)),
"job": "node",
"receive": "true",
"replica": model.LabelValue("1"),
}, res[3].Metric)

// Try query with deduplication.
Expand Down Expand Up @@ -191,7 +191,7 @@ func testQuerySimple(t *testing.T, conf testConfig) {
}, res[1].Metric)
testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue("localhost:9100"),
"instance": model.LabelValue(nodeExporterHTTP(1)),
"job": "node",
"receive": "true",
}, res[2].Metric)
Expand Down Expand Up @@ -219,13 +219,13 @@ scrape_configs:
`, name, replicas, firstPromPort)
}

func defaultPromRemoteWriteConfig(remoteWriteEndpoint string) string {
func defaultPromRemoteWriteConfig(nodeExporterHTTP, remoteWriteEndpoint string) string {
return fmt.Sprintf(`
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
- targets: ['%s']
remote_write:
- url: "%s"
`, remoteWriteEndpoint)
`, nodeExporterHTTP, remoteWriteEndpoint)
}
116 changes: 116 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package e2e_test

import (
"context"
"os"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

var (
receiveHashringSuite = newSpinupSuite().
Add(querierWithStoreFlags(1, "replica", remoteWriteReceiveGRPC(1), remoteWriteReceiveGRPC(2), remoteWriteReceiveGRPC(3))).
Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))).
Add(receiver(2, defaultPromRemoteWriteConfig(nodeExporterHTTP(2), remoteWriteEndpoint(2)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))).
Add(receiver(3, defaultPromRemoteWriteConfig(nodeExporterHTTP(3), remoteWriteEndpoint(3)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3)))
)

func TestReceive(t *testing.T) {
for _, tt := range []testConfig{
{
"hashring",
receiveHashringSuite,
},
} {
t.Run(tt.name, func(t *testing.T) {
testReceive(t, tt)
})
}
}

// testReceive runs a setup of Prometheus servers, receive nodes, and query nodes and verifies that
// queries return data from the Prometheus servers. Additionally it verifies that remote-writes were routed through the correct receive node.
func testReceive(t *testing.T, conf testConfig) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)

exit, err := conf.suite.Exec(t, ctx, conf.name)
if err != nil {
t.Errorf("spinup failed: %v", err)
cancel()
return
}

defer func() {
cancel()
<-exit
}()

var res model.Vector

w := log.NewSyncWriter(os.Stderr)
l := log.NewLogfmtLogger(w)
l = log.With(l, "conf-name", conf.name)

// Query without deduplication so we can check what replica the
// time series ended up on.
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
select {
case <-exit:
cancel()
return nil
default:
}

var (
err error
warnings []string
)
res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "up", time.Now(), promclient.QueryOptions{
Deduplicate: false,
})
if err != nil {
return err
}

if len(warnings) > 0 {
// we don't expect warnings.
return errors.Errorf("unexpected warnings %s", warnings)
}

expectedRes := 3
if len(res) != expectedRes {
return errors.Errorf("unexpected result size %d, expected %d", len(res), expectedRes)
}

return nil
}))

testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue(nodeExporterHTTP(1)),
"job": "node",
"receive": "true",
"replica": model.LabelValue("2"),
}, res[0].Metric)
testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue(nodeExporterHTTP(2)),
"job": "node",
"receive": "true",
"replica": model.LabelValue("3"),
}, res[1].Metric)
testutil.Equals(t, model.Metric{
"__name__": "up",
"instance": model.LabelValue(nodeExporterHTTP(3)),
"job": "node",
"receive": "true",
"replica": model.LabelValue("1"),
}, res[2].Metric)
}
48 changes: 40 additions & 8 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) }
promRemoteWriteHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(100+i)) }

nodeExporterHTTP = func(i int) string { return fmt.Sprintf("localhost:%d", 9100+i) }

sidecarGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19090+i) }
sidecarHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19190+i) }

Expand All @@ -39,6 +41,7 @@ var (
rulerGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19790+i) }
rulerHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19890+i) }

remoteWriteEndpoint = func(i int) string { return fmt.Sprintf("http://%s/api/v1/receive", remoteWriteReceiveHTTP(i)) }
remoteWriteReceiveHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18690+i) }
remoteWriteReceiveGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18790+i) }
remoteWriteReceiveMetricHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18890+i) }
Expand Down Expand Up @@ -122,7 +125,10 @@ func scraper(i int, config string) cmdScheduleFunc {
}
}

func receiver(i int, config string) cmdScheduleFunc {
func receiver(i int, config string, receiveAddresses ...string) cmdScheduleFunc {
if len(receiveAddresses) == 0 {
receiveAddresses = []string{remoteWriteEndpoint(1)}
}
return func(workDir string) ([]Exec, error) {
promDir := fmt.Sprintf("%s/data/remote-write-prom%d", workDir, i)
if err := os.MkdirAll(promDir, 0777); err != nil {
Expand All @@ -140,37 +146,51 @@ func receiver(i int, config string) cmdScheduleFunc {
"--log.level", "info",
"--web.listen-address", promRemoteWriteHTTP(i),
)))

hashringsFileDir := fmt.Sprintf("%s/data/receiveFile%d", workDir, i)
if err := os.MkdirAll(hashringsFileDir, 0777); err != nil {
return nil, errors.Wrap(err, "create receive dir failed")
}

if err := ioutil.WriteFile(path.Join(hashringsFileDir, "hashrings.json"), []byte(generateHashringsFile(receiveAddresses)), 0666); err != nil {
return nil, errors.Wrap(err, "creating receive config failed")
}

return append(cmds, newCmdExec(exec.Command("thanos", "receive",
"--debug.name", fmt.Sprintf("remote-write-receive-%d", i),
"--grpc-address", remoteWriteReceiveGRPC(i),
"--http-address", remoteWriteReceiveMetricHTTP(i),
"--remote-write.address", remoteWriteReceiveHTTP(i),
"--labels", "receive=\"true\"",
"--labels", fmt.Sprintf(`replica="%d"`, i),
"--tsdb.path", promDir,
"--log.level", "debug"))), nil
"--log.level", "debug",
"--receive.local-endpoint", remoteWriteEndpoint(i),
"--receive.hashrings-file", path.Join(hashringsFileDir, "hashrings.json"),
"--receive.hashrings-file-refresh-interval", "5s"))), nil
}
}

func querierWithStoreFlags(i int, replicaLabel string, storesAddresses ...string) cmdScheduleFunc {
func querierWithStoreFlags(i int, replicaLabel string, storeAddresses ...string) cmdScheduleFunc {
return func(_ string) ([]Exec, error) {
args := defaultQuerierFlags(i, replicaLabel)

for _, addr := range storesAddresses {
for _, addr := range storeAddresses {
args = append(args, "--store", addr)
}
return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil
}
}

func querierWithFileSD(i int, replicaLabel string, storesAddresses ...string) cmdScheduleFunc {
func querierWithFileSD(i int, replicaLabel string, storeAddresses ...string) cmdScheduleFunc {
return func(workDir string) ([]Exec, error) {
queryFileSDDir := fmt.Sprintf("%s/data/queryFileSd%d", workDir, i)
if err := os.MkdirAll(queryFileSDDir, 0777); err != nil {
return nil, errors.Wrap(err, "create prom dir failed")
return nil, errors.Wrap(err, "create query dir failed")
}

if err := ioutil.WriteFile(queryFileSDDir+"/filesd.json", []byte(generateFileSD(storesAddresses)), 0666); err != nil {
return nil, errors.Wrap(err, "creating prom config failed")
if err := ioutil.WriteFile(queryFileSDDir+"/filesd.json", []byte(generateFileSD(storeAddresses)), 0666); err != nil {
return nil, errors.Wrap(err, "creating query SD config failed")
}

args := append(
Expand Down Expand Up @@ -534,6 +554,18 @@ func generateFileSD(addresses []string) string {
return conf
}

func generateHashringsFile(addresses []string) string {
conf := "[ { \"endpoints\": ["
for index, addr := range addresses {
conf += fmt.Sprintf("\"%s\"", addr)
if index+1 < len(addresses) {
conf += ","
}
}
conf += "] } ]"
return conf
}

func defaultQuerierFlags(i int, replicaLabel string) []string {
return []string{
"query",
Expand Down

0 comments on commit 251a6da

Please sign in to comment.