From d1db70770fe2a43a3bbd9349881e7de43f79efe4 Mon Sep 17 00:00:00 2001 From: Yang Hu Date: Thu, 30 Jul 2020 08:50:11 -0500 Subject: [PATCH] add conversion from Int64 and Double OTLP metrics for Promehteus Remote Write Exporter add prometheus remote write exporter address lint issues improve test coverage fix lint error add prometheus remote write exporter to default components change metric name label add conversion from ns to ms rename tests add attribute to label functionality in exporter.go format code resolve conflicts with master switch to fmt.Errorf add conversion from Int64 and Double OTLP metrics add prometheus remote write exporter address lint issues improve test coverage fix lint error add prometheus remote write exporter to default components change metric name label add conversion from ns to ms rename tests add attribute to label functionality in exporter.go format code resolve conflicts with master add check for nil change data format to dataold --- .../prometheusremotewriteexporter/exporter.go | 170 ++++++- .../exporter_test.go | 422 +++++++++++++++++- .../prometheusremotewriteexporter/helper.go | 25 ++ .../testutil_test.go | 94 +++- go.mod | 1 + 5 files changed, 694 insertions(+), 18 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index c63e051fd66..c9b57eb3bb2 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -16,13 +16,24 @@ package prometheusremotewriteexporter import ( + "bufio" + "bytes" "context" "errors" + "io" "net/http" "net/url" + "strings" "sync" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" + "go.opentelemetry.io/collector/internal/dataold" ) // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint @@ -68,5 +79,160 @@ func (prwe *prwExporter) shutdown(context.Context) error { // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally // exports the map. func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { - return 0, nil + prwe.wg.Add(1) + defer prwe.wg.Done() + select { + case <-prwe.closeChan: + return pdatautil.MetricCount(md), errors.New("shutdown has been called") + default: + tsMap := map[string]*prompb.TimeSeries{} + dropped := 0 + errs := []string{} + + resourceMetrics := dataold.MetricDataToOtlp(pdatautil.MetricsToOldInternalMetrics(md)) + for _, resourceMetric := range resourceMetrics { + if resourceMetric == nil { + continue + } + // TODO: add resource attributes as labels, probably in next PR + for _, instrumentationMetrics := range resourceMetric.InstrumentationLibraryMetrics { + if instrumentationMetrics == nil { + continue + } + // TODO: decide if instrumentation library information should be exported as labels + for _, metric := range instrumentationMetrics.Metrics { + if metric == nil { + continue + } + // check for valid type and temporality combination + if ok := validateMetrics(metric.MetricDescriptor); !ok { + dropped++ + errs = append(errs, "invalid temporality and type combination") + continue + } + // handle individual metric based on type + switch metric.GetMetricDescriptor().GetType() { + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64, + otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if err := prwe.handleScalarMetric(tsMap, metric); err != nil { + dropped++ + errs = append(errs, err.Error()) + } + } + } + } + } + + if err := prwe.export(ctx, tsMap); err != nil { + return pdatautil.MetricCount(md), err + } + + if dropped != 0 { + return dropped, errors.New(strings.Join(errs, "\n")) + } + + return 0, nil + } +} + +// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into +// its corresponding TimeSeries in tsMap. +// tsMap and metric cannot be nil, and metric must have a non-nil descriptor +func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + + mType := metric.MetricDescriptor.Type + + switch mType { + // int points + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64: + if metric.Int64DataPoints == nil { + return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + + for _, pt := range metric.Int64DataPoints { + + // create parameters for addSample + name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + labels := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: float64(pt.Value), + // convert ns to ms + Timestamp: convertTimeStamp(pt.TimeUnixNano), + } + + addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType()) + } + return nil + + // double points + case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if metric.DoubleDataPoints == nil { + return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + for _, pt := range metric.DoubleDataPoints { + + // create parameters for addSample + name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + labels := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: pt.Value, + Timestamp: convertTimeStamp(pt.TimeUnixNano), + } + + addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType()) + } + return nil + } + + return errors.New("invalid metric type: wants int or double data points") +} + +// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order +func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { + //Calls the helper function to convert the TsMap to the desired format + req, err := wrapTimeSeries(tsMap) + if err != nil { + return err + } + + //Uses proto.Marshal to convert the WriteRequest into bytes array + data, err := proto.Marshal(req) + if err != nil { + return err + } + buf := make([]byte, len(data), cap(data)) + compressedData := snappy.Encode(buf, data) + + //Create the HTTP POST request to send to the endpoint + httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) + if err != nil { + return err + } + + // Add necessary headers specified by: + // https://cortexmetrics.io/docs/apis/#remote-api + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + httpReq = httpReq.WithContext(ctx) + + _, cancel := context.WithTimeout(context.Background(), prwe.client.Timeout) + defer cancel() + + httpResp, err := prwe.client.Do(httpReq) + if err != nil { + return err + } + + if httpResp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + errMsg := "server returned HTTP status " + httpResp.Status + ": " + line + return errors.New(errMsg) + } + return nil } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index fcdb771b020..6a15745dd89 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -12,24 +12,136 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Note: implementation for this class is in a separate PR package prometheusremotewriteexporter import ( "context" + "fmt" + "io/ioutil" "net/http" + "net/http/httptest" + "net/url" "sync" "testing" + proto "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/exporter/exporterhelper" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" + "go.opentelemetry.io/collector/internal/dataold" + "go.opentelemetry.io/collector/internal/dataold/testdataold" ) +// Test_handleScalarMetric checks whether data points within a single scalar metric can be added to a map of +// TimeSeries correctly. +// Test cases are two data point belonging to the same TimeSeries, two data point belonging different TimeSeries, +// and nil data points case. +func Test_handleScalarMetric(t *testing.T) { + sameTs := map[string]*prompb.TimeSeries{ + // string signature of the data point is the key of the map + typeMonotonicInt64 + "-__name__-same_ts_int_points_total" + lb1Sig: getTimeSeries( + getPromLabels(label11, value11, label12, value12, nameStr, "same_ts_int_points_total"), + getSample(float64(intVal1), msTime1), + getSample(float64(intVal2), msTime1)), + } + differentTs := map[string]*prompb.TimeSeries{ + typeMonotonicDouble + "-__name__-different_ts_double_points_total" + lb1Sig: getTimeSeries( + getPromLabels(label11, value11, label12, value12, nameStr, "different_ts_double_points_total"), + getSample(floatVal1, msTime1)), + typeMonotonicDouble + "-__name__-different_ts_double_points_total" + lb2Sig: getTimeSeries( + getPromLabels(label21, value21, label22, value22, nameStr, "different_ts_double_points_total"), + getSample(floatVal2, msTime2)), + } + + tests := []struct { + name string + m *otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + &otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", monotonicInt64Comb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "invalid_type_array", + &otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_type_array", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "same_ts_int_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("same_ts_int_points", monotonicInt64Comb, validCombinations), + Int64DataPoints: []*otlp.Int64DataPoint{ + getIntDataPoint(lbs1, intVal1, time1), + getIntDataPoint(lbs1, intVal2, time1), + }, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + sameTs, + }, + { + "different_ts_double_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("different_ts_double_points", monotonicDoubleComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: []*otlp.DoubleDataPoint{ + getDoubleDataPoint(lbs1, floatVal1, time1), + getDoubleDataPoint(lbs2, floatVal2, time2), + }, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + differentTs, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &prwExporter{} + ok := prw.handleScalarMetric(tsMap, tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k]) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + // Test_newPrwExporter checks that a new exporter instance with non-nil fields is initialized func Test_newPrwExporter(t *testing.T) { config := &Config{ @@ -97,18 +209,314 @@ func Test_shutdown(t *testing.T) { wg: new(sync.WaitGroup), closeChan: make(chan struct{}), } + wg := new(sync.WaitGroup) + errChan := make(chan error, 5) err := prwe.shutdown(context.Background()) - assert.NoError(t, err) + require.NoError(t, err) + errChan = make(chan error, 5) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, ok := prwe.pushMetrics(context.Background(), + pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataEmpty())) + errChan <- ok + }() + } + wg.Wait() + close(errChan) + for ok := range errChan { + assert.Error(t, ok) + } +} + +//Test whether or not the Server receives the correct TimeSeries. +//Currently considering making this test an iterative for loop of multiple TimeSeries +//Much akin to Test_pushMetrics +func Test_export(t *testing.T) { + //First we will instantiate a dummy TimeSeries instance to pass into both the export call and compare the http request + labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22) + sample1 := getSample(floatVal1, msTime1) + sample2 := getSample(floatVal2, msTime2) + ts1 := getTimeSeries(labels, sample1, sample2) + handleFunc := func(w http.ResponseWriter, r *http.Request, code int) { + //The following is a handler function that reads the sent httpRequest, unmarshals, and checks if the WriteRequest + //preserves the TimeSeries data correctly + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, body) + //Receives the http requests and unzip, unmarshals, and extracts TimeSeries + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) + assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) + writeReq := &prompb.WriteRequest{} + unzipped := []byte{} + + dest, err := snappy.Decode(unzipped, body) + require.NoError(t, err) + + ok := proto.Unmarshal(dest, writeReq) + require.NoError(t, ok) + assert.EqualValues(t, 1, len(writeReq.Timeseries)) + require.NotNil(t, writeReq.GetTimeseries()) + assert.Equal(t, *ts1, writeReq.GetTimeseries()[0]) + w.WriteHeader(code) + fmt.Fprintf(w, "error message") + } + + // Create in test table format to check if different HTTP response codes or server errors + // are properly identified + tests := []struct { + name string + ts prompb.TimeSeries + serverUp bool + httpResponseCode int + returnError bool + }{ + {"success_case", + *ts1, + true, + http.StatusAccepted, + false, + }, + { + "server_no_response_case", + *ts1, + false, + http.StatusAccepted, + true, + }, { + "error_status_code_case", + *ts1, + true, + http.StatusForbidden, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleFunc(w, r, tt.httpResponseCode) + })) + defer server.Close() + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + if !tt.serverUp { + server.Close() + } + err := runExportPipeline(t, ts1, serverURL) + if tt.returnError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} + +func runExportPipeline(t *testing.T, ts *prompb.TimeSeries, endpoint *url.URL) error { + //First we will construct a TimeSeries array from the testutils package + testmap := make(map[string]*prompb.TimeSeries) + testmap["test"] = ts + + HTTPClient := http.DefaultClient + //after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint + prwe, err := newPrwExporter("test", endpoint.String(), HTTPClient) + if err != nil { + return err + } + err = prwe.export(context.Background(), testmap) + return err } // Test_pushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as // expected func Test_pushMetrics(t *testing.T) { - prwe := &prwExporter{ - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), + // fail cases + noTempBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataManyMetricsSameResource(10)) + invalidTypeBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataMetricTypeInvalid()) + + invalidTemp := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setTemporality(&invalidTemp, otlp.MetricDescriptor_INVALID_TEMPORALITY) + invalidTempBatch := pdatautil.MetricsFromOldInternalMetrics(invalidTemp) + + nilDescBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataNilMetricDescriptor()) + nilBatch1 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + nilBatch2 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + + setTemporality(&nilBatch1, otlp.MetricDescriptor_CUMULATIVE) + setTemporality(&nilBatch2, otlp.MetricDescriptor_CUMULATIVE) + setDataPointToNil(&nilBatch1, typeMonotonicInt64) + setType(&nilBatch2, typeMonotonicDouble) + + nilIntDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics(nilBatch1) + nilDoubleDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics(nilBatch2) + + // Success cases: 10 counter metrics, 2 points in each. Two TimeSeries in total + batch1 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setTemporality(&batch1, otlp.MetricDescriptor_CUMULATIVE) + scalarBatch := pdatautil.MetricsFromOldInternalMetrics(batch1) + + // Partial Success cases + batch2 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setTemporality(&batch2, otlp.MetricDescriptor_CUMULATIVE) + failDesc := dataold.MetricDataToOtlp(batch2)[0].InstrumentationLibraryMetrics[0].Metrics[0].GetMetricDescriptor() + failDesc.Temporality = otlp.MetricDescriptor_INVALID_TEMPORALITY + partialBatch := pdatautil.MetricsFromOldInternalMetrics(batch2) + + checkFunc := func(t *testing.T, r *http.Request, expected int) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, len(body)) + dest, err := snappy.Decode(buf, body) + assert.Equal(t, "0.1.0", r.Header.Get("x-prometheus-remote-write-version")) + assert.Equal(t, "snappy", r.Header.Get("content-encoding")) + assert.NotNil(t, r.Header.Get("tenant-id")) + require.NoError(t, err) + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(dest, wr) + require.Nil(t, ok) + assert.EqualValues(t, expected, len(wr.Timeseries)) + } + + tests := []struct { + name string + md *pdata.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int) + expectedTimeSeries int + httpResponseCode int + numDroppedTimeSeries int + returnErr bool + }{ + { + "invalid_type_case", + &invalidTypeBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(invalidTypeBatch), + true, + }, + { + "invalid_temporality_case", + &invalidTempBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(invalidTempBatch), + true, + }, + { + "nil_desc_case", + &nilDescBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilDescBatch), + true, + }, + { + "nil_int_point_case", + &nilIntDataPointsBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilIntDataPointsBatch), + true, + }, + { + "nil_double_point_case", + &nilDoubleDataPointsBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilDoubleDataPointsBatch), + true, + }, + { + "no_temp_case", + &noTempBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "http_error_case", + &noTempBatch, + nil, + 0, + http.StatusForbidden, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "scalar_case", + &scalarBatch, + checkFunc, + 2, + http.StatusAccepted, + 0, + false, + }, + { + "partial_success_case", + &partialBatch, + checkFunc, + 2, + http.StatusAccepted, + 1, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.reqTestFunc != nil { + tt.reqTestFunc(t, r, tt.expectedTimeSeries) + } + w.WriteHeader(tt.httpResponseCode) + })) + + defer server.Close() + + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + + config := &Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "prometheusremotewrite", + NameVal: "prometheusremotewrite", + }, + Namespace: "", + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://some.url:9411/api/prom/push", + // We almost read 0 bytes, so no need to tune ReadBufferSize. + ReadBufferSize: 0, + WriteBufferSize: 512 * 1024, + }, + } + assert.NotNil(t, config) + // c, err := config.HTTPClientSettings.ToClient() + // assert.Nil(t, err) + c := http.DefaultClient + prwe, nErr := newPrwExporter(config.Namespace, serverURL.String(), c) + require.NoError(t, nErr) + numDroppedTimeSeries, err := prwe.pushMetrics(context.Background(), *tt.md) + assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) + if tt.returnErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) } - _, err := prwe.pushMetrics(context.Background(), pdata.Metrics{}) - assert.NoError(t, err) } diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index fafdc3fc2f9..21a8efe9ab1 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -18,8 +18,10 @@ import ( "log" "sort" "strings" + "time" "unicode" + "github.com/pkg/errors" "github.com/prometheus/prometheus/prompb" common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" @@ -27,6 +29,7 @@ import ( ) const ( + nameStr = "__name__" totalStr = "total" delimeter = "_" keyStr = "key" @@ -179,6 +182,28 @@ func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string { return sanitize(b.String()) } +// Simple helper function that takes the map +// and creates a WriteRequest from the struct -- can move to the helper.go file +func wrapTimeSeries(tsMap map[string]*prompb.TimeSeries) (*prompb.WriteRequest, error) { + if len(tsMap) == 0 { + return nil, errors.Errorf("invalid tsMap: cannot be empty map") + } + TsArray := []prompb.TimeSeries{} + for _, v := range tsMap { + TsArray = append(TsArray, *v) + } + wrapped := prompb.WriteRequest{ + Timeseries: TsArray, + //Other parameters of the WriteRequest are unnecessary for our Export + } + return &wrapped, nil +} + +// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms +func convertTimeStamp(timestamp uint64) int64 { + return int64(timestamp / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) +} + // copied from prometheus-go-metric-exporter // sanitize replaces non-alphanumeric characters with underscores in s. func sanitize(s string) string { diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go index d35afe54c1d..be18b56553c 100644 --- a/exporter/prometheusremotewriteexporter/testutil_test.go +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -21,6 +21,7 @@ import ( commonpb "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" + "go.opentelemetry.io/collector/internal/dataold" ) type combination struct { @@ -34,9 +35,11 @@ var ( msTime1 = int64(time1 / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) msTime2 = int64(time2 / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) - typeInt64 = "INT64" - - typeHistogram = "HISTOGRAM" + typeInt64 = "INT64" + typeMonotonicInt64 = "MONOTONIC_INT64" + typeMonotonicDouble = "MONOTONIC_DOUBLE" + typeHistogram = "HISTOGRAM" + typeSummary = "SUMMARY" label11 = "test_label11" value11 = "test_value11" @@ -53,10 +56,13 @@ var ( dirty1 = "%" dirty2 = "?" - intVal1 int64 = 1 - intVal2 int64 = 2 + intVal1 int64 = 1 + intVal2 int64 = 2 + floatVal1 = 1.0 + floatVal2 = 2.0 lbs1 = getLabels(label11, value11, label12, value12) + lbs2 = getLabels(label21, value21, label22, value22) lbs1Dirty = getLabels(label11+dirty1, value11, dirty2+label12, value12) promLbs1 = getPromLabels(label11, value11, label12, value12) @@ -67,10 +73,11 @@ var ( ns1 = "test_ns" name1 = "valid_single_int_point" - monotonicInt64Comb = 0 - histogramComb = 2 - summaryComb = 3 - validCombinations = []combination{ + monotonicInt64Comb = 0 + monotonicDoubleComb = 1 + histogramComb = 2 + summaryComb = 3 + validCombinations = []combination{ {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_CUMULATIVE}, {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_CUMULATIVE}, {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_CUMULATIVE}, @@ -131,6 +138,24 @@ func getDescriptor(name string, i int, comb []combination) *otlp.MetricDescripto } } +func getIntDataPoint(labels []*commonpb.StringKeyValue, value int64, ts uint64) *otlp.Int64DataPoint { + return &otlp.Int64DataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + +func getDoubleDataPoint(labels []*commonpb.StringKeyValue, value float64, ts uint64) *otlp.DoubleDataPoint { + return &otlp.DoubleDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + // Prometheus TimeSeries func getPromLabels(lbs ...string) []prompb.Label { pbLbs := prompb.Labels{ @@ -162,3 +187,54 @@ func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) *prompb.Time Samples: samples, } } + +//setCumulative is for creating the dataold.MetricData to test with +func setTemporality(metricsData *dataold.MetricData, temp otlp.MetricDescriptor_Temporality) { + for _, r := range dataold.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + m.MetricDescriptor.Temporality = temp + } + } + } +} + +//setDataPointToNil is for creating the dataold.MetricData to test with +func setDataPointToNil(metricsData *dataold.MetricData, dataField string) { + for _, r := range dataold.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + switch dataField { + case typeMonotonicInt64: + m.Int64DataPoints = nil + case typeMonotonicDouble: + m.DoubleDataPoints = nil + case typeHistogram: + m.HistogramDataPoints = nil + case typeSummary: + m.SummaryDataPoints = nil + } + } + } + } +} + +//setType is for creating the dataold.MetricData to test with +func setType(metricsData *dataold.MetricData, dataField string) { + for _, r := range dataold.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + switch dataField { + case typeMonotonicInt64: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_MONOTONIC_INT64 + case typeMonotonicDouble: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_MONOTONIC_DOUBLE + case typeHistogram: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_HISTOGRAM + case typeSummary: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_SUMMARY + } + } + } + } +} diff --git a/go.mod b/go.mod index cb94179acdd..36ad210e8e0 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/golang/protobuf v1.4.2 + github.com/golang/snappy v0.0.1 github.com/golangci/golangci-lint v1.30.0 github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5 github.com/google/go-cmp v0.5.1