diff --git a/src/integration/prometheus/prometheus.go b/src/integration/prometheus/prometheus.go new file mode 100644 index 0000000000..63027b2538 --- /dev/null +++ b/src/integration/prometheus/prometheus.go @@ -0,0 +1,821 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package prometheus contains resources for starting a docker-backed +// Prometheus. +package prometheus + +import ( + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/m3db/m3/src/integration/resources" + "github.com/m3db/m3/src/integration/resources/docker" + "github.com/m3db/m3/src/query/generated/proto/prompb" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/x/headers" + xtime "github.com/m3db/m3/src/x/time" +) + +const ( + // TestPrometheusDBNodeConfig is the test config for the dbnode. + TestPrometheusDBNodeConfig = ` +db: {} +` + + // TestPrometheusCoordinatorConfig is the test config for the coordinator. + TestPrometheusCoordinatorConfig = ` +limits: + perQuery: + maxFetchedSeries: 100 + +query: + restrictTags: + match: + - name: restricted_metrics_type + type: NOTEQUAL + value: hidden + strip: + - restricted_metrics_type + +lookbackDuration: 10m +` +) + +// RunTest contains the logic for running the prometheus test. +func RunTest(t *testing.T, m3 resources.M3Resources, prom resources.ExternalResources) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + logger.Info("running prometheus tests") + p := prom.(*docker.Prometheus) + + testPrometheusRemoteRead(t, p, logger) + testPrometheusRemoteWriteMultiNamespaces(t, p, logger) + testPrometheusRemoteWriteEmptyLabelNameReturns400(t, m3.Coordinator(), logger) + testPrometheusRemoteWriteEmptyLabelValueReturns400(t, m3.Coordinator(), logger) + testPrometheusRemoteWriteDuplicateLabelReturns400(t, m3.Coordinator(), logger) + testPrometheusRemoteWriteTooOldReturns400(t, m3.Coordinator(), logger) + testPrometheusRemoteWriteRetrictMetricsType(t, m3.Coordinator(), logger) + testQueryLookbackApplied(t, m3.Coordinator(), logger) + testQueryLimitsApplied(t, m3.Coordinator(), logger) +} + +func testPrometheusRemoteRead(t *testing.T, p *docker.Prometheus, logger *zap.Logger) { + // Ensure Prometheus can proxy a Prometheus query + logger.Info("testing prometheus remote read") + verifyPrometheusQuery(t, p, "prometheus_remote_storage_samples_total", 100) +} + +func testPrometheusRemoteWriteMultiNamespaces( + t *testing.T, + p *docker.Prometheus, + logger *zap.Logger, +) { + logger.Info("testing remote write to multiple namespaces") + + // Make sure we're proxying writes to the unaggregated namespace + query := fmt.Sprintf( + "database_write_tagged_success{namespace=\"%v\"}", resources.UnaggName, + ) + verifyPrometheusQuery(t, p, query, 0) + + // Make sure we're proxying writes to the aggregated namespace + query = fmt.Sprintf( + "database_write_tagged_success{namespace=\"%v\"}", resources.AggName, + ) + verifyPrometheusQuery(t, p, query, 0) +} + +func testPrometheusRemoteWriteEmptyLabelNameReturns400( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + logger.Info("test write empty name for a label returns HTTP 400") + err := coordinator.WriteProm("foo_metric", map[string]string{ + "non_empty_name": "foo", + "": "bar", + }, []prompb.Sample{ + { + Value: 42, + Timestamp: storage.TimeToPromTimestamp(xtime.Now()), + }, + }, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "400") +} + +func testPrometheusRemoteWriteEmptyLabelValueReturns400( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + logger.Info("test write empty value for a label returns HTTP 400") + err := coordinator.WriteProm("foo_metric", map[string]string{ + "foo": "bar", + "non_empty_name": "", + }, []prompb.Sample{ + { + Value: 42, + Timestamp: storage.TimeToPromTimestamp(xtime.Now()), + }, + }, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "400") +} + +func testPrometheusRemoteWriteDuplicateLabelReturns400( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + logger.Info("test write with duplicate labels returns HTTP 400") + err := coordinator.WritePromWithLabels("foo_metric", []prompb.Label{ + { + Name: []byte("dupe_name"), + Value: []byte("foo"), + }, + { + Name: []byte("non_dupe_name"), + Value: []byte("bar"), + }, + { + Name: []byte("dupe_name"), + Value: []byte("baz"), + }, + }, []prompb.Sample{ + { + Value: 42, + Timestamp: storage.TimeToPromTimestamp(xtime.Now()), + }, + }, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "400") +} + +func testPrometheusRemoteWriteTooOldReturns400( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + logger.Info("test write into the past returns HTTP 400") + err := coordinator.WriteProm("foo_metric", nil, []prompb.Sample{ + { + Value: 3.142, + Timestamp: storage.TimeToPromTimestamp(xtime.Now().Add(-1 * time.Hour)), + }, + }, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "400") +} + +func testPrometheusRemoteWriteRetrictMetricsType( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + logger.Info("test write with unaggregated metrics type works as expected") + err := coordinator.WriteProm("bar_metric", nil, []prompb.Sample{ + { + Value: 42.42, + Timestamp: storage.TimeToPromTimestamp(xtime.Now()), + }, + }, resources.Headers{ + headers.MetricsTypeHeader: []string{"unaggregated"}, + }) + require.NoError(t, err) + + logger.Info("test write with aggregated metrics type works as expected") + err = coordinator.WriteProm("bar_metric", nil, []prompb.Sample{ + { + Value: 84.84, + Timestamp: storage.TimeToPromTimestamp(xtime.Now()), + }, + }, resources.Headers{ + headers.MetricsTypeHeader: []string{"aggregated"}, + headers.MetricsStoragePolicyHeader: []string{"15s:6h"}, + }) + require.NoError(t, err) +} + +func testQueryLookbackApplied( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + // NB: this test depends on the config in m3coordinator.yml for this test + // and the following config value "lookbackDuration: 10m". + logger.Info("test lookback config respected") + + err := coordinator.WriteProm("lookback_test", nil, []prompb.Sample{ + { + Value: 42.42, + Timestamp: storage.TimeToPromTimestamp(xtime.Now().Add(-9 * time.Minute)), + }, + }, resources.Headers{ + headers.MetricsTypeHeader: []string{"unaggregated"}, + }) + require.NoError(t, err) + + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "lookback_test", + Start: time.Now().Add(-10 * time.Minute), + End: time.Now(), + Step: 15 * time.Second, + }, + nil, + func(res model.Matrix) error { + if len(res) == 0 || len(res[0].Values) == 0 { + return errors.New("no samples found") + } + + latestTS := res[0].Values[len(res[0].Values)-1].Timestamp.Time() + nowMinusTwoSteps := time.Now().Add(-30 * time.Second) + if latestTS.After(nowMinusTwoSteps) { + return nil + } + + return errors.New("latest timestamp is not within two steps from now") + }, + ) +} + +func testQueryLimitsApplied( + t *testing.T, + coordinator resources.Coordinator, + logger *zap.Logger, +) { + logger.Info("test query series limit with coordinator limit header " + + "(default errors without RequireExhaustive disabled)") + requireError(t, func() error { + _, err := coordinator.InstantQuery(resources.QueryRequest{ + Query: "{metrics_storage=\"m3db_remote\"}", + }, resources.Headers{ + headers.LimitMaxSeriesHeader: []string{"10"}, + }) + return err + }, "query exceeded limit") + + logger.Info("test query series limit with require-exhaustive headers false") + requireInstantQuerySuccess(t, + coordinator, + resources.QueryRequest{ + Query: "database_write_tagged_success", + }, + resources.Headers{ + headers.LimitMaxSeriesHeader: []string{"2"}, + headers.LimitRequireExhaustiveHeader: []string{"false"}, + }, + func(res model.Vector) error { + if len(res) != 2 { + return fmt.Errorf("expected two results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query series limit with require-exhaustive headers true " + + "(below limit therefore no error)") + requireInstantQuerySuccess(t, + coordinator, + resources.QueryRequest{ + Query: "database_write_tagged_success", + }, + resources.Headers{ + headers.LimitMaxSeriesHeader: []string{"4"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }, + func(res model.Vector) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query series limit with require-exhaustive headers " + + "true (above limit therefore error)") + requireError(t, func() error { + _, err := coordinator.InstantQuery(resources.QueryRequest{ + Query: "database_write_tagged_success", + }, resources.Headers{ + headers.LimitMaxSeriesHeader: []string{"3"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "query exceeded limit") + + requireError(t, func() error { + _, err := coordinator.InstantQuery(resources.QueryRequest{ + Query: "database_write_tagged_success", + }, resources.Headers{ + headers.LimitMaxSeriesHeader: []string{"3"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "400") + + logger.Info("test query docs limit with require-exhaustive headers false") + requireInstantQuerySuccess(t, + coordinator, + resources.QueryRequest{ + Query: "database_write_tagged_success", + }, + resources.Headers{ + headers.LimitMaxDocsHeader: []string{"1"}, + headers.LimitRequireExhaustiveHeader: []string{"false"}, + }, + func(res model.Vector) error { + // NB(nate): docs limit is imprecise so will not match exact number of series + // returned + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query docs limit with require-exhaustive headers true " + + "(below limit therefore no error)") + requireInstantQuerySuccess(t, + coordinator, + resources.QueryRequest{ + Query: "database_write_tagged_success", + }, + resources.Headers{ + headers.LimitMaxDocsHeader: []string{"4"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }, + func(res model.Vector) error { + // NB(nate): docs limit is imprecise so will not match exact number of series + // returned + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query docs limit with require-exhaustive headers " + + "true (above limit therefore error)") + requireError(t, func() error { + _, err := coordinator.InstantQuery(resources.QueryRequest{ + Query: "database_write_tagged_success", + }, resources.Headers{ + headers.LimitMaxDocsHeader: []string{"1"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "query exceeded limit") + + requireError(t, func() error { + _, err := coordinator.InstantQuery(resources.QueryRequest{ + Query: "database_write_tagged_success", + }, resources.Headers{ + headers.LimitMaxDocsHeader: []string{"1"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "400") + + logger.Info("test query returned datapoints limit - zero limit disabled") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Now().Add(-100 * time.Minute), + End: time.Now(), + Step: 15 * time.Second, + }, + resources.Headers{ + headers.LimitMaxReturnedDatapointsHeader: []string{"0"}, + }, + func(res model.Matrix) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series limit - zero limit disabled") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Now().Add(-100 * time.Minute), + End: time.Now(), + Step: 15 * time.Second, + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesHeader: []string{"0"}, + }, + func(res model.Matrix) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series limit - above limit disabled") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Now().Add(-100 * time.Minute), + End: time.Now(), + Step: 15 * time.Second, + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesHeader: []string{"4"}, + }, + func(res model.Matrix) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series limit - at limit") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Now().Add(-100 * time.Minute), + End: time.Now(), + Step: 15 * time.Second, + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesHeader: []string{"3"}, + }, + func(res model.Matrix) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series limit - below limit") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Now().Add(-100 * time.Minute), + End: time.Now(), + Step: 15 * time.Second, + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesHeader: []string{"2"}, + }, + func(res model.Matrix) error { + if len(res) != 2 { + return fmt.Errorf("expected two results. received %d", len(res)) + } + + return nil + }) + + // Test writes to prep for testing returned series metadata limits + for i := 0; i < 3; i++ { + err := coordinator.WriteProm("metadata_test_series", map[string]string{ + "metadata_test_label": fmt.Sprintf("series_label_%d", i), + }, []prompb.Sample{ + { + Value: 42.42, + Timestamp: storage.TimeToPromTimestamp(xtime.Now()), + }, + }, nil) + require.NoError(t, err) + } + + logger.Info("test query returned series metadata limit - zero limit disabled") + requireLabelValuesSuccess(t, + coordinator, + resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesMetadataHeader: []string{"0"}, + }, + func(res model.LabelValues) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series metadata limit - above limit disabled") + requireLabelValuesSuccess(t, + coordinator, + resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesMetadataHeader: []string{"4"}, + }, + func(res model.LabelValues) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series metadata limit - at limit") + requireLabelValuesSuccess(t, + coordinator, + resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesMetadataHeader: []string{"3"}, + }, + func(res model.LabelValues) error { + if len(res) != 3 { + return fmt.Errorf("expected three results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query returned series metadata limit - below limit") + requireLabelValuesSuccess(t, + coordinator, + resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesMetadataHeader: []string{"2"}, + }, + func(res model.LabelValues) error { + if len(res) != 2 { + return fmt.Errorf("expected two results. received %d", len(res)) + } + + return nil + }) + + logger.Info("test query time range limit with coordinator defaults") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Time{}, + End: time.Now(), + Step: 15 * time.Second, + }, + nil, + func(res model.Matrix) error { + if len(res) == 0 { + return errors.New("expected results to be greater than 0") + } + + return nil + }) + + logger.Info("test query time range limit with require-exhaustive headers false") + requireRangeQuerySuccess(t, + coordinator, + resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Unix(0, 0), + End: time.Now(), + Step: 15 * time.Second, + }, + resources.Headers{ + headers.LimitMaxRangeHeader: []string{"4h"}, + headers.LimitRequireExhaustiveHeader: []string{"false"}, + }, + func(res model.Matrix) error { + if len(res) == 0 { + return errors.New("expected results to be greater than 0") + } + + return nil + }) + + logger.Info("test query time range limit with require-exhaustive headers true " + + "(above limit therefore error)") + requireError(t, func() error { + _, err := coordinator.RangeQuery(resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Unix(0, 0), + End: time.Now(), + Step: 15 * time.Second, + }, resources.Headers{ + headers.LimitMaxRangeHeader: []string{"4h"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "query exceeded limit") + requireError(t, func() error { + _, err := coordinator.RangeQuery(resources.RangeQueryRequest{ + Query: "database_write_tagged_success", + Start: time.Unix(0, 0), + End: time.Now(), + Step: 15 * time.Second, + }, resources.Headers{ + headers.LimitMaxRangeHeader: []string{"4h"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "400") + + logger.Info("test query time range limit with coordinator defaults") + requireLabelValuesSuccess(t, + coordinator, + resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, + resources.Headers{ + headers.LimitMaxReturnedSeriesMetadataHeader: []string{"2"}, + }, + func(res model.LabelValues) error { + if len(res) == 0 { + return errors.New("expected results to be greater than 0") + } + + return nil + }) + + logger.Info("test query time range limit with require-exhaustive headers false") + requireLabelValuesSuccess(t, + coordinator, + resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, + resources.Headers{ + headers.LimitMaxRangeHeader: []string{"4h"}, + headers.LimitRequireExhaustiveHeader: []string{"false"}, + }, + func(res model.LabelValues) error { + if len(res) == 0 { + return errors.New("expected results to be greater than 0") + } + + return nil + }) + + logger.Info("test query time range limit with require-exhaustive headers true " + + "(above limit therefore error)") + requireError(t, func() error { + _, err := coordinator.LabelValues(resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, resources.Headers{ + headers.LimitMaxRangeHeader: []string{"4h"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "query exceeded limit") + requireError(t, func() error { + _, err := coordinator.LabelValues(resources.LabelValuesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "metadata_test_series", + }, + LabelName: "metadata_test_label", + }, resources.Headers{ + headers.LimitMaxRangeHeader: []string{"4h"}, + headers.LimitRequireExhaustiveHeader: []string{"true"}, + }) + return err + }, "400") +} + +func requireError(t *testing.T, query func() error, errorMsg string) { + require.NoError(t, resources.Retry(func() error { + if err := query(); err != nil { + if errorMsg == "" || strings.Contains(err.Error(), errorMsg) { + return nil + } + } + + err := errors.New("expected read request to fail with error") + if errorMsg == "" { + err = fmt.Errorf("expected read request to fail with error containing: %s", errorMsg) + } + + return err + })) +} + +func requireInstantQuerySuccess( + t *testing.T, + coordinator resources.Coordinator, + request resources.QueryRequest, + headers resources.Headers, + successCond func(res model.Vector) error, +) { + require.NoError(t, resources.Retry(func() error { + res, err := coordinator.InstantQuery(request, headers) + if err != nil { + return err + } + + return successCond(res) + })) +} + +func requireRangeQuerySuccess( + t *testing.T, + coordinator resources.Coordinator, + request resources.RangeQueryRequest, + headers resources.Headers, + successCond func(res model.Matrix) error, +) { + require.NoError(t, resources.Retry(func() error { + res, err := coordinator.RangeQuery(request, headers) + if err != nil { + return err + } + + return successCond(res) + })) +} + +func requireLabelValuesSuccess( + t *testing.T, + coordinator resources.Coordinator, + request resources.LabelValuesRequest, + headers resources.Headers, + successCond func(res model.LabelValues) error, +) { + require.NoError(t, resources.Retry(func() error { + res, err := coordinator.LabelValues(request, headers) + if err != nil { + return err + } + + return successCond(res) + })) +} + +func verifyPrometheusQuery(t *testing.T, p *docker.Prometheus, query string, threshold float64) { + require.NoError(t, resources.Retry(func() error { + res, err := p.Query(docker.PrometheusQueryRequest{ + Query: query, + }) + if err != nil { + return err + } + if len(res) == 0 { + return errors.New("no samples returned for query") + } + if res[0].Value > model.SampleValue(threshold) { + return nil + } + + return errors.New("value not greater than threshold") + })) +} diff --git a/src/integration/prometheus/prometheus_test.go b/src/integration/prometheus/prometheus_test.go new file mode 100644 index 0000000000..4bc87054af --- /dev/null +++ b/src/integration/prometheus/prometheus_test.go @@ -0,0 +1,73 @@ +// +build cluster_integration +// +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package prometheus + +import ( + "path" + "runtime" + "testing" + + "github.com/m3db/m3/src/integration/resources" + "github.com/m3db/m3/src/integration/resources/docker" + "github.com/m3db/m3/src/integration/resources/inprocess" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPrometheus(t *testing.T) { + m3, prom, closer := testSetup(t) + defer closer() + + RunTest(t, m3, prom) +} + +func testSetup(t *testing.T) (resources.M3Resources, resources.ExternalResources, func()) { + cfgs, err := inprocess.NewClusterConfigsFromYAML( + TestPrometheusDBNodeConfig, TestPrometheusCoordinatorConfig, "", + ) + require.NoError(t, err) + + m3, err := inprocess.NewCluster(cfgs, + resources.ClusterOptions{ + DBNode: resources.NewDBNodeClusterOptions(), + }, + ) + require.NoError(t, err) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + _, filename, _, _ := runtime.Caller(0) + prom := docker.NewPrometheus(docker.PrometheusOptions{ + Pool: pool, + PathToCfg: path.Join(path.Dir(filename), "../resources/docker/config/prometheus.yml"), + }) + require.NoError(t, prom.Setup()) + + return m3, prom, func() { + assert.NoError(t, prom.Close()) + assert.NoError(t, m3.Cleanup()) + } +} diff --git a/src/integration/resources/common.go b/src/integration/resources/common.go index 637107d548..ac78667465 100644 --- a/src/integration/resources/common.go +++ b/src/integration/resources/common.go @@ -29,7 +29,7 @@ import ( const ( retryMaxInterval = 5 * time.Second - retryMaxTime = 3 * time.Minute + retryMaxTime = 1 * time.Minute ) // Retry is a function for retrying an operation in integration tests. diff --git a/src/integration/resources/coordinator_client.go b/src/integration/resources/coordinator_client.go index 0f3b8da322..74c2968c5b 100644 --- a/src/integration/resources/coordinator_client.go +++ b/src/integration/resources/coordinator_client.go @@ -582,19 +582,39 @@ func (c *CoordinatorClient) WriteCarbon( return con.Close() } -// WriteProm writes a prometheus metric. -func (c *CoordinatorClient) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error { - var ( - url = c.makeURL("api/v1/prom/remote/write") - reqLabels = []prompb.Label{{Name: []byte(model.MetricNameLabel), Value: []byte(name)}} - ) +// WriteProm writes a prometheus metric. Takes tags/labels as a map for convenience. +func (c *CoordinatorClient) WriteProm( + name string, + tags map[string]string, + samples []prompb.Sample, + headers Headers, +) error { + labels := make([]prompb.Label, 0, len(tags)) for tag, value := range tags { - reqLabels = append(reqLabels, prompb.Label{ + labels = append(labels, prompb.Label{ Name: []byte(tag), Value: []byte(value), }) } + + return c.WritePromWithLabels(name, labels, samples, headers) +} + +// WritePromWithLabels writes a prometheus metric. Allows you to provide the labels for the write +// directly instead of conveniently converting them from a map. +func (c *CoordinatorClient) WritePromWithLabels( + name string, + labels []prompb.Label, + samples []prompb.Sample, + headers Headers, +) error { + var ( + url = c.makeURL("api/v1/prom/remote/write") + reqLabels = []prompb.Label{{Name: []byte(model.MetricNameLabel), Value: []byte(name)}} + ) + reqLabels = append(reqLabels, labels...) + writeRequest := prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { @@ -620,6 +640,11 @@ func (c *CoordinatorClient) WriteProm(name string, tags map[string]string, sampl logger.Error("failed constructing request", zap.Error(err)) return err } + for key, vals := range headers { + for _, val := range vals { + req.Header.Add(key, val) + } + } req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeProtobuf) resp, err := c.client.Do(req) @@ -763,12 +788,6 @@ type vectorResult struct { // RangeQuery runs a range query with provided headers func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers Headers) (model.Matrix, error) { - if req.Start.IsZero() { - req.Start = time.Now() - } - if req.End.IsZero() { - req.End = time.Now() - } if req.Step == 0 { req.Step = 15 * time.Second // default step is 15 seconds. } @@ -917,7 +936,8 @@ func (c *CoordinatorClient) runQuery( b, err := ioutil.ReadAll(resp.Body) if status := resp.StatusCode; status != http.StatusOK { - return "", fmt.Errorf("query response status not OK, received %v", status) + return "", fmt.Errorf("query response status not OK, received %v. error=%v", + status, string(b)) } if contentType, ok := resp.Header["Content-Type"]; !ok { diff --git a/src/integration/resources/docker/config/prometheus.yml b/src/integration/resources/docker/config/prometheus.yml index c11e4d7239..be417a7fb0 100644 --- a/src/integration/resources/docker/config/prometheus.yml +++ b/src/integration/resources/docker/config/prometheus.yml @@ -32,17 +32,17 @@ scrape_configs: - job_name: 'coordinator' static_configs: - - targets: ['coordinator01:7203'] + - targets: ['host.docker.internal:7203'] - job_name: 'dbnode' static_configs: - - targets: ['dbnode01:9004'] + - targets: ['host.docker.internal:9004'] remote_read: - - url: http://coordinator01:7201/api/v1/prom/remote/read + - url: http://host.docker.internal:7201/api/v1/prom/remote/read remote_write: - - url: http://coordinator01:7201/api/v1/prom/remote/write + - url: http://host.docker.internal:7201/api/v1/prom/remote/write write_relabel_configs: - target_label: metrics_storage replacement: m3db_remote diff --git a/src/integration/resources/docker/coordinator.go b/src/integration/resources/docker/coordinator.go index fc50d9575b..2c743d7f78 100644 --- a/src/integration/resources/docker/coordinator.go +++ b/src/integration/resources/docker/coordinator.go @@ -205,12 +205,30 @@ func (c *coordinator) WriteCarbon( return c.client.WriteCarbon(url, metric, v, t) } -func (c *coordinator) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error { +func (c *coordinator) WriteProm( + name string, + tags map[string]string, + samples []prompb.Sample, + headers resources.Headers, +) error { + if c.resource.closed { + return errClosed + } + + return c.client.WriteProm(name, tags, samples, headers) +} + +func (c *coordinator) WritePromWithLabels( + name string, + labels []prompb.Label, + samples []prompb.Sample, + headers resources.Headers, +) error { if c.resource.closed { return errClosed } - return c.client.WriteProm(name, tags, samples) + return c.client.WritePromWithLabels(name, labels, samples, headers) } func (c *coordinator) ApplyKVUpdate(update string) error { diff --git a/src/integration/resources/docker/prometheus.go b/src/integration/resources/docker/prometheus.go index 82776907a2..72971f04ea 100644 --- a/src/integration/resources/docker/prometheus.go +++ b/src/integration/resources/docker/prometheus.go @@ -22,17 +22,22 @@ package docker import ( "context" + "encoding/json" "errors" "fmt" + "io/ioutil" "net/http" + "time" "github.com/ory/dockertest/v3" + "github.com/prometheus/common/model" "github.com/m3db/m3/src/integration/resources" "github.com/m3db/m3/src/x/instrument" ) -type prometheus struct { +// Prometheus is a docker-backed instantiation of Prometheus. +type Prometheus struct { pool *dockertest.Pool pathToCfg string iOpts instrument.Options @@ -59,14 +64,15 @@ func NewPrometheus(opts PrometheusOptions) resources.ExternalResources { if opts.InstrumentOptions == nil { opts.InstrumentOptions = instrument.NewOptions() } - return &prometheus{ + return &Prometheus{ pool: opts.Pool, pathToCfg: opts.PathToCfg, iOpts: opts.InstrumentOptions, } } -func (p *prometheus) Setup() error { +// Setup is a method that setups up the prometheus instance. +func (p *Prometheus) Setup() error { if p.resource != nil { return errors.New("prometheus already setup. must close resource " + "before attempting to setup again") @@ -97,7 +103,7 @@ func (p *prometheus) Setup() error { return p.waitForHealthy() } -func (p *prometheus) waitForHealthy() error { +func (p *Prometheus) waitForHealthy() error { return resources.Retry(func() error { req, err := http.NewRequestWithContext( context.Background(), @@ -123,7 +129,79 @@ func (p *prometheus) waitForHealthy() error { }) } -func (p *prometheus) Close() error { +// PrometheusQueryRequest contains the parameters for making a query request. +type PrometheusQueryRequest struct { + // Query is the prometheus query to execute + Query string + // Time is the time to execute the query at + Time time.Time +} + +// String converts the query request into a string suitable for use +// in the url params or request body +func (p *PrometheusQueryRequest) String() string { + str := fmt.Sprintf("query=%v", p.Query) + + if !p.Time.IsZero() { + str += fmt.Sprintf("&time=%v", p.Time.Unix()) + } + + return str +} + +// Query executes a query request against the prometheus resource. +func (p *Prometheus) Query(req PrometheusQueryRequest) (model.Vector, error) { + if p.resource.Closed() { + return nil, errClosed + } + + r, err := http.NewRequestWithContext( + context.Background(), + http.MethodGet, + fmt.Sprintf("http://0.0.0.0:9090/api/v1/query?%s", req.String()), + nil, + ) + if err != nil { + return nil, err + } + + client := http.Client{} + res, err := client.Do(r) + if err != nil { + return nil, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("non-200 status code received. "+ + "status=%v responseBody=%v", res.StatusCode, string(body)) + } + + var parsedResp jsonInstantQueryResponse + if err := json.Unmarshal(body, &parsedResp); err != nil { + return nil, err + } + + return parsedResp.Data.Result, nil +} + +type jsonInstantQueryResponse struct { + Status string + Data vectorResult +} + +type vectorResult struct { + ResultType model.ValueType + Result model.Vector +} + +// Close cleans up the prometheus instance. +func (p *Prometheus) Close() error { if p.resource.Closed() { return errClosed } diff --git a/src/integration/resources/inprocess/aggregator_test.go b/src/integration/resources/inprocess/aggregator_test.go index 41e412dd8c..3b6823f21b 100644 --- a/src/integration/resources/inprocess/aggregator_test.go +++ b/src/integration/resources/inprocess/aggregator_test.go @@ -288,7 +288,7 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) { expectedValue = model.SampleValue(6) ) assert.NoError(t, resources.Retry(func() error { - return coord.WriteProm("cpu", map[string]string{"host": "host1"}, samples) + return coord.WriteProm("cpu", map[string]string{"host": "host1"}, samples, nil) })) queryHeaders := resources.Headers{"M3-Metrics-Type": {"aggregated"}, "M3-Storage-Policy": {"10s:6h"}} @@ -315,7 +315,7 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) { Query: "cpu", Start: time.Now().Add(-30 * time.Second), End: time.Now(), - Step: 1 * time.Second, + Step: 1 * time.Second, }, queryHeaders, ) diff --git a/src/integration/resources/inprocess/cluster.go b/src/integration/resources/inprocess/cluster.go index 7471127e74..4de3511573 100644 --- a/src/integration/resources/inprocess/cluster.go +++ b/src/integration/resources/inprocess/cluster.go @@ -33,11 +33,13 @@ import ( aggcfg "github.com/m3db/m3/src/cmd/services/m3aggregator/config" dbcfg "github.com/m3db/m3/src/cmd/services/m3dbnode/config" coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/discovery" "github.com/m3db/m3/src/dbnode/environment" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/integration/resources" nettest "github.com/m3db/m3/src/integration/resources/net" + "github.com/m3db/m3/src/query/storage/m3" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/config/hostid" xerrors "github.com/m3db/m3/src/x/errors" @@ -248,7 +250,17 @@ func GenerateClusterSpecification( coordConfig := configs.Coordinator // TODO(nate): refactor to support having envconfig if no DB. - coordConfig.Clusters[0].Client.EnvironmentConfig = &envConfig + if len(coordConfig.Clusters) > 0 { + coordConfig.Clusters[0].Client.EnvironmentConfig = &envConfig + } else { + coordConfig.Clusters = m3.ClustersStaticConfiguration{ + { + Client: client.Configuration{ + EnvironmentConfig: &envConfig, + }, + }, + } + } var aggCfgs []aggcfg.Configuration if opts.Aggregator != nil { diff --git a/src/integration/resources/inprocess/coordinator.go b/src/integration/resources/inprocess/coordinator.go index 8ce1c3d941..243f663148 100644 --- a/src/integration/resources/inprocess/coordinator.go +++ b/src/integration/resources/inprocess/coordinator.go @@ -409,9 +409,25 @@ func (c *Coordinator) WriteCarbon(port int, metric string, v float64, t time.Tim return c.client.WriteCarbon(fmt.Sprintf("0.0.0.0:%d", port), metric, v, t) } -// WriteProm writes a prometheus metric. -func (c *Coordinator) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error { - return c.client.WriteProm(name, tags, samples) +// WriteProm writes a prometheus metric. Takes tags/labels as a map for convenience. +func (c *Coordinator) WriteProm( + name string, + tags map[string]string, + samples []prompb.Sample, + headers resources.Headers, +) error { + return c.client.WriteProm(name, tags, samples, headers) +} + +// WritePromWithLabels writes a prometheus metric. Allows you to provide the labels for +// the write directly instead of conveniently converting them from a map. +func (c *Coordinator) WritePromWithLabels( + name string, + labels []prompb.Label, + samples []prompb.Sample, + headers resources.Headers, +) error { + return c.client.WritePromWithLabels(name, labels, samples, headers) } // RunQuery runs the given query with a given verification function. diff --git a/src/integration/resources/inprocess/coordinator_test.go b/src/integration/resources/inprocess/coordinator_test.go index e9bc11028d..fbb236bc04 100644 --- a/src/integration/resources/inprocess/coordinator_test.go +++ b/src/integration/resources/inprocess/coordinator_test.go @@ -89,7 +89,7 @@ func TestCoordinatorAPIs(t *testing.T) { func testMetadataAPIs(t *testing.T, coordinator resources.Coordinator) { err := coordinator.WriteProm("cpu", map[string]string{"pod": "foo-1234"}, []prompb.Sample{ {Value: 1, Timestamp: storage.TimeToPromTimestamp(xtime.Now())}, - }) + }, nil) require.NoError(t, err) names, err := coordinator.LabelNames(resources.LabelNamesRequest{}, nil) diff --git a/src/integration/resources/resources.go b/src/integration/resources/resources.go index 216f6d562d..19342f33d0 100644 --- a/src/integration/resources/resources.go +++ b/src/integration/resources/resources.go @@ -89,18 +89,46 @@ func SetupCluster( } var ( - aggDatabase = admin.DatabaseCreateRequest{ + unaggDatabase = admin.DatabaseCreateRequest{ Type: "cluster", - NamespaceName: AggName, + NamespaceName: UnaggName, RetentionTime: retention, NumShards: numShards, ReplicationFactor: replicationFactor, Hosts: hosts, } - unaggDatabase = admin.DatabaseCreateRequest{ - NamespaceName: UnaggName, - RetentionTime: retention, + aggNamespace = admin.NamespaceAddRequest{ + Name: AggName, + Options: &namespace.NamespaceOptions{ + BootstrapEnabled: true, + FlushEnabled: true, + WritesToCommitLog: true, + CleanupEnabled: true, + SnapshotEnabled: true, + IndexOptions: &namespace.IndexOptions{ + Enabled: true, + BlockSizeNanos: int64(30 * time.Minute), + }, + RetentionOptions: &namespace.RetentionOptions{ + RetentionPeriodNanos: int64(6 * time.Hour), + BlockSizeNanos: int64(30 * time.Minute), + BufferFutureNanos: int64(2 * time.Minute), + BufferPastNanos: int64(10 * time.Minute), + BlockDataExpiry: true, + BlockDataExpiryAfterNotAccessPeriodNanos: int64(time.Minute * 5), + }, + AggregationOptions: &namespace.AggregationOptions{ + Aggregations: []*namespace.Aggregation{ + { + Aggregated: true, + Attributes: &namespace.AggregatedAttributes{ + ResolutionNanos: int64(15 * time.Second), + }, + }, + }, + }, + }, } coldWriteNamespace = admin.NamespaceAddRequest{ @@ -131,8 +159,8 @@ func SetupCluster( return err } - logger.Info("creating database", zap.Any("request", aggDatabase)) - if _, err := coordinator.CreateDatabase(aggDatabase); err != nil { + logger.Info("creating database", zap.Any("request", unaggDatabase)) + if _, err := coordinator.CreateDatabase(unaggDatabase); err != nil { return err } @@ -141,18 +169,18 @@ func SetupCluster( return err } - logger.Info("waiting for namespace", zap.String("name", AggName)) - if err := coordinator.WaitForNamespace(AggName); err != nil { + logger.Info("waiting for namespace", zap.String("name", UnaggName)) + if err := coordinator.WaitForNamespace(UnaggName); err != nil { return err } - logger.Info("creating namespace", zap.Any("request", unaggDatabase)) - if _, err := coordinator.CreateDatabase(unaggDatabase); err != nil { + logger.Info("creating namespace", zap.Any("request", aggNamespace)) + if _, err := coordinator.AddNamespace(aggNamespace); err != nil { return err } - logger.Info("waiting for namespace", zap.String("name", UnaggName)) - if err := coordinator.WaitForNamespace(UnaggName); err != nil { + logger.Info("waiting for namespace", zap.String("name", AggName)) + if err := coordinator.WaitForNamespace(AggName); err != nil { return err } diff --git a/src/integration/resources/types.go b/src/integration/resources/types.go index b9a0a40aff..544202b994 100644 --- a/src/integration/resources/types.go +++ b/src/integration/resources/types.go @@ -60,8 +60,11 @@ type Coordinator interface { ApplyKVUpdate(update string) error // WriteCarbon writes a carbon metric datapoint at a given time. WriteCarbon(port int, metric string, v float64, t time.Time) error - // WriteProm writes a prometheus metric. - WriteProm(name string, tags map[string]string, samples []prompb.Sample) error + // WriteProm writes a prometheus metric. Takes tags/labels as a map for convenience. + WriteProm(name string, tags map[string]string, samples []prompb.Sample, headers Headers) error + // WritePromWithLabels writes a prometheus metric. Allows you to provide the labels for + // the write directly instead of conveniently converting them from a map. + WritePromWithLabels(name string, labels []prompb.Label, samples []prompb.Sample, headers Headers) error // RunQuery runs the given query with a given verification function. RunQuery(verifier ResponseVerifier, query string, headers Headers) error // InstantQuery runs an instant query with provided headers