diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index c63e051fd66..c9b57eb3bb2 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -16,13 +16,24 @@ package prometheusremotewriteexporter import ( + "bufio" + "bytes" "context" "errors" + "io" "net/http" "net/url" + "strings" "sync" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" + "go.opentelemetry.io/collector/internal/dataold" ) // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint @@ -68,5 +79,160 @@ func (prwe *prwExporter) shutdown(context.Context) error { // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally // exports the map. func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { - return 0, nil + prwe.wg.Add(1) + defer prwe.wg.Done() + select { + case <-prwe.closeChan: + return pdatautil.MetricCount(md), errors.New("shutdown has been called") + default: + tsMap := map[string]*prompb.TimeSeries{} + dropped := 0 + errs := []string{} + + resourceMetrics := dataold.MetricDataToOtlp(pdatautil.MetricsToOldInternalMetrics(md)) + for _, resourceMetric := range resourceMetrics { + if resourceMetric == nil { + continue + } + // TODO: add resource attributes as labels, probably in next PR + for _, instrumentationMetrics := range resourceMetric.InstrumentationLibraryMetrics { + if instrumentationMetrics == nil { + continue + } + // TODO: decide if instrumentation library information should be exported as labels + for _, metric := range instrumentationMetrics.Metrics { + if metric == nil { + continue + } + // check for valid type and temporality combination + if ok := validateMetrics(metric.MetricDescriptor); !ok { + dropped++ + errs = append(errs, "invalid temporality and type combination") + continue + } + // handle individual metric based on type + switch metric.GetMetricDescriptor().GetType() { + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64, + otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if err := prwe.handleScalarMetric(tsMap, metric); err != nil { + dropped++ + errs = append(errs, err.Error()) + } + } + } + } + } + + if err := prwe.export(ctx, tsMap); err != nil { + return pdatautil.MetricCount(md), err + } + + if dropped != 0 { + return dropped, errors.New(strings.Join(errs, "\n")) + } + + return 0, nil + } +} + +// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into +// its corresponding TimeSeries in tsMap. +// tsMap and metric cannot be nil, and metric must have a non-nil descriptor +func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + + mType := metric.MetricDescriptor.Type + + switch mType { + // int points + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64: + if metric.Int64DataPoints == nil { + return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + + for _, pt := range metric.Int64DataPoints { + + // create parameters for addSample + name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + labels := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: float64(pt.Value), + // convert ns to ms + Timestamp: convertTimeStamp(pt.TimeUnixNano), + } + + addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType()) + } + return nil + + // double points + case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if metric.DoubleDataPoints == nil { + return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + for _, pt := range metric.DoubleDataPoints { + + // create parameters for addSample + name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + labels := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: pt.Value, + Timestamp: convertTimeStamp(pt.TimeUnixNano), + } + + addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType()) + } + return nil + } + + return errors.New("invalid metric type: wants int or double data points") +} + +// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order +func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { + //Calls the helper function to convert the TsMap to the desired format + req, err := wrapTimeSeries(tsMap) + if err != nil { + return err + } + + //Uses proto.Marshal to convert the WriteRequest into bytes array + data, err := proto.Marshal(req) + if err != nil { + return err + } + buf := make([]byte, len(data), cap(data)) + compressedData := snappy.Encode(buf, data) + + //Create the HTTP POST request to send to the endpoint + httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) + if err != nil { + return err + } + + // Add necessary headers specified by: + // https://cortexmetrics.io/docs/apis/#remote-api + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + httpReq = httpReq.WithContext(ctx) + + _, cancel := context.WithTimeout(context.Background(), prwe.client.Timeout) + defer cancel() + + httpResp, err := prwe.client.Do(httpReq) + if err != nil { + return err + } + + if httpResp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + errMsg := "server returned HTTP status " + httpResp.Status + ": " + line + return errors.New(errMsg) + } + return nil } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index fcdb771b020..6a15745dd89 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -12,24 +12,136 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Note: implementation for this class is in a separate PR package prometheusremotewriteexporter import ( "context" + "fmt" + "io/ioutil" "net/http" + "net/http/httptest" + "net/url" "sync" "testing" + proto "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/exporter/exporterhelper" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" + "go.opentelemetry.io/collector/internal/dataold" + "go.opentelemetry.io/collector/internal/dataold/testdataold" ) +// Test_handleScalarMetric checks whether data points within a single scalar metric can be added to a map of +// TimeSeries correctly. +// Test cases are two data point belonging to the same TimeSeries, two data point belonging different TimeSeries, +// and nil data points case. +func Test_handleScalarMetric(t *testing.T) { + sameTs := map[string]*prompb.TimeSeries{ + // string signature of the data point is the key of the map + typeMonotonicInt64 + "-__name__-same_ts_int_points_total" + lb1Sig: getTimeSeries( + getPromLabels(label11, value11, label12, value12, nameStr, "same_ts_int_points_total"), + getSample(float64(intVal1), msTime1), + getSample(float64(intVal2), msTime1)), + } + differentTs := map[string]*prompb.TimeSeries{ + typeMonotonicDouble + "-__name__-different_ts_double_points_total" + lb1Sig: getTimeSeries( + getPromLabels(label11, value11, label12, value12, nameStr, "different_ts_double_points_total"), + getSample(floatVal1, msTime1)), + typeMonotonicDouble + "-__name__-different_ts_double_points_total" + lb2Sig: getTimeSeries( + getPromLabels(label21, value21, label22, value22, nameStr, "different_ts_double_points_total"), + getSample(floatVal2, msTime2)), + } + + tests := []struct { + name string + m *otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + &otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", monotonicInt64Comb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "invalid_type_array", + &otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_type_array", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "same_ts_int_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("same_ts_int_points", monotonicInt64Comb, validCombinations), + Int64DataPoints: []*otlp.Int64DataPoint{ + getIntDataPoint(lbs1, intVal1, time1), + getIntDataPoint(lbs1, intVal2, time1), + }, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + sameTs, + }, + { + "different_ts_double_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("different_ts_double_points", monotonicDoubleComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: []*otlp.DoubleDataPoint{ + getDoubleDataPoint(lbs1, floatVal1, time1), + getDoubleDataPoint(lbs2, floatVal2, time2), + }, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + differentTs, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &prwExporter{} + ok := prw.handleScalarMetric(tsMap, tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k]) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + // Test_newPrwExporter checks that a new exporter instance with non-nil fields is initialized func Test_newPrwExporter(t *testing.T) { config := &Config{ @@ -97,18 +209,314 @@ func Test_shutdown(t *testing.T) { wg: new(sync.WaitGroup), closeChan: make(chan struct{}), } + wg := new(sync.WaitGroup) + errChan := make(chan error, 5) err := prwe.shutdown(context.Background()) - assert.NoError(t, err) + require.NoError(t, err) + errChan = make(chan error, 5) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, ok := prwe.pushMetrics(context.Background(), + pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataEmpty())) + errChan <- ok + }() + } + wg.Wait() + close(errChan) + for ok := range errChan { + assert.Error(t, ok) + } +} + +//Test whether or not the Server receives the correct TimeSeries. +//Currently considering making this test an iterative for loop of multiple TimeSeries +//Much akin to Test_pushMetrics +func Test_export(t *testing.T) { + //First we will instantiate a dummy TimeSeries instance to pass into both the export call and compare the http request + labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22) + sample1 := getSample(floatVal1, msTime1) + sample2 := getSample(floatVal2, msTime2) + ts1 := getTimeSeries(labels, sample1, sample2) + handleFunc := func(w http.ResponseWriter, r *http.Request, code int) { + //The following is a handler function that reads the sent httpRequest, unmarshals, and checks if the WriteRequest + //preserves the TimeSeries data correctly + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, body) + //Receives the http requests and unzip, unmarshals, and extracts TimeSeries + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) + assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) + writeReq := &prompb.WriteRequest{} + unzipped := []byte{} + + dest, err := snappy.Decode(unzipped, body) + require.NoError(t, err) + + ok := proto.Unmarshal(dest, writeReq) + require.NoError(t, ok) + assert.EqualValues(t, 1, len(writeReq.Timeseries)) + require.NotNil(t, writeReq.GetTimeseries()) + assert.Equal(t, *ts1, writeReq.GetTimeseries()[0]) + w.WriteHeader(code) + fmt.Fprintf(w, "error message") + } + + // Create in test table format to check if different HTTP response codes or server errors + // are properly identified + tests := []struct { + name string + ts prompb.TimeSeries + serverUp bool + httpResponseCode int + returnError bool + }{ + {"success_case", + *ts1, + true, + http.StatusAccepted, + false, + }, + { + "server_no_response_case", + *ts1, + false, + http.StatusAccepted, + true, + }, { + "error_status_code_case", + *ts1, + true, + http.StatusForbidden, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleFunc(w, r, tt.httpResponseCode) + })) + defer server.Close() + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + if !tt.serverUp { + server.Close() + } + err := runExportPipeline(t, ts1, serverURL) + if tt.returnError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} + +func runExportPipeline(t *testing.T, ts *prompb.TimeSeries, endpoint *url.URL) error { + //First we will construct a TimeSeries array from the testutils package + testmap := make(map[string]*prompb.TimeSeries) + testmap["test"] = ts + + HTTPClient := http.DefaultClient + //after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint + prwe, err := newPrwExporter("test", endpoint.String(), HTTPClient) + if err != nil { + return err + } + err = prwe.export(context.Background(), testmap) + return err } // Test_pushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as // expected func Test_pushMetrics(t *testing.T) { - prwe := &prwExporter{ - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), + // fail cases + noTempBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataManyMetricsSameResource(10)) + invalidTypeBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataMetricTypeInvalid()) + + invalidTemp := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setTemporality(&invalidTemp, otlp.MetricDescriptor_INVALID_TEMPORALITY) + invalidTempBatch := pdatautil.MetricsFromOldInternalMetrics(invalidTemp) + + nilDescBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataNilMetricDescriptor()) + nilBatch1 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + nilBatch2 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + + setTemporality(&nilBatch1, otlp.MetricDescriptor_CUMULATIVE) + setTemporality(&nilBatch2, otlp.MetricDescriptor_CUMULATIVE) + setDataPointToNil(&nilBatch1, typeMonotonicInt64) + setType(&nilBatch2, typeMonotonicDouble) + + nilIntDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics(nilBatch1) + nilDoubleDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics(nilBatch2) + + // Success cases: 10 counter metrics, 2 points in each. Two TimeSeries in total + batch1 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setTemporality(&batch1, otlp.MetricDescriptor_CUMULATIVE) + scalarBatch := pdatautil.MetricsFromOldInternalMetrics(batch1) + + // Partial Success cases + batch2 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setTemporality(&batch2, otlp.MetricDescriptor_CUMULATIVE) + failDesc := dataold.MetricDataToOtlp(batch2)[0].InstrumentationLibraryMetrics[0].Metrics[0].GetMetricDescriptor() + failDesc.Temporality = otlp.MetricDescriptor_INVALID_TEMPORALITY + partialBatch := pdatautil.MetricsFromOldInternalMetrics(batch2) + + checkFunc := func(t *testing.T, r *http.Request, expected int) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, len(body)) + dest, err := snappy.Decode(buf, body) + assert.Equal(t, "0.1.0", r.Header.Get("x-prometheus-remote-write-version")) + assert.Equal(t, "snappy", r.Header.Get("content-encoding")) + assert.NotNil(t, r.Header.Get("tenant-id")) + require.NoError(t, err) + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(dest, wr) + require.Nil(t, ok) + assert.EqualValues(t, expected, len(wr.Timeseries)) + } + + tests := []struct { + name string + md *pdata.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int) + expectedTimeSeries int + httpResponseCode int + numDroppedTimeSeries int + returnErr bool + }{ + { + "invalid_type_case", + &invalidTypeBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(invalidTypeBatch), + true, + }, + { + "invalid_temporality_case", + &invalidTempBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(invalidTempBatch), + true, + }, + { + "nil_desc_case", + &nilDescBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilDescBatch), + true, + }, + { + "nil_int_point_case", + &nilIntDataPointsBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilIntDataPointsBatch), + true, + }, + { + "nil_double_point_case", + &nilDoubleDataPointsBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilDoubleDataPointsBatch), + true, + }, + { + "no_temp_case", + &noTempBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "http_error_case", + &noTempBatch, + nil, + 0, + http.StatusForbidden, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "scalar_case", + &scalarBatch, + checkFunc, + 2, + http.StatusAccepted, + 0, + false, + }, + { + "partial_success_case", + &partialBatch, + checkFunc, + 2, + http.StatusAccepted, + 1, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.reqTestFunc != nil { + tt.reqTestFunc(t, r, tt.expectedTimeSeries) + } + w.WriteHeader(tt.httpResponseCode) + })) + + defer server.Close() + + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + + config := &Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "prometheusremotewrite", + NameVal: "prometheusremotewrite", + }, + Namespace: "", + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://some.url:9411/api/prom/push", + // We almost read 0 bytes, so no need to tune ReadBufferSize. + ReadBufferSize: 0, + WriteBufferSize: 512 * 1024, + }, + } + assert.NotNil(t, config) + // c, err := config.HTTPClientSettings.ToClient() + // assert.Nil(t, err) + c := http.DefaultClient + prwe, nErr := newPrwExporter(config.Namespace, serverURL.String(), c) + require.NoError(t, nErr) + numDroppedTimeSeries, err := prwe.pushMetrics(context.Background(), *tt.md) + assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) + if tt.returnErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) } - _, err := prwe.pushMetrics(context.Background(), pdata.Metrics{}) - assert.NoError(t, err) } diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index fafdc3fc2f9..21a8efe9ab1 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -18,8 +18,10 @@ import ( "log" "sort" "strings" + "time" "unicode" + "github.com/pkg/errors" "github.com/prometheus/prometheus/prompb" common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" @@ -27,6 +29,7 @@ import ( ) const ( + nameStr = "__name__" totalStr = "total" delimeter = "_" keyStr = "key" @@ -179,6 +182,28 @@ func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string { return sanitize(b.String()) } +// Simple helper function that takes the map +// and creates a WriteRequest from the struct -- can move to the helper.go file +func wrapTimeSeries(tsMap map[string]*prompb.TimeSeries) (*prompb.WriteRequest, error) { + if len(tsMap) == 0 { + return nil, errors.Errorf("invalid tsMap: cannot be empty map") + } + TsArray := []prompb.TimeSeries{} + for _, v := range tsMap { + TsArray = append(TsArray, *v) + } + wrapped := prompb.WriteRequest{ + Timeseries: TsArray, + //Other parameters of the WriteRequest are unnecessary for our Export + } + return &wrapped, nil +} + +// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms +func convertTimeStamp(timestamp uint64) int64 { + return int64(timestamp / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) +} + // copied from prometheus-go-metric-exporter // sanitize replaces non-alphanumeric characters with underscores in s. func sanitize(s string) string { diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go index d35afe54c1d..be18b56553c 100644 --- a/exporter/prometheusremotewriteexporter/testutil_test.go +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -21,6 +21,7 @@ import ( commonpb "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" + "go.opentelemetry.io/collector/internal/dataold" ) type combination struct { @@ -34,9 +35,11 @@ var ( msTime1 = int64(time1 / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) msTime2 = int64(time2 / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) - typeInt64 = "INT64" - - typeHistogram = "HISTOGRAM" + typeInt64 = "INT64" + typeMonotonicInt64 = "MONOTONIC_INT64" + typeMonotonicDouble = "MONOTONIC_DOUBLE" + typeHistogram = "HISTOGRAM" + typeSummary = "SUMMARY" label11 = "test_label11" value11 = "test_value11" @@ -53,10 +56,13 @@ var ( dirty1 = "%" dirty2 = "?" - intVal1 int64 = 1 - intVal2 int64 = 2 + intVal1 int64 = 1 + intVal2 int64 = 2 + floatVal1 = 1.0 + floatVal2 = 2.0 lbs1 = getLabels(label11, value11, label12, value12) + lbs2 = getLabels(label21, value21, label22, value22) lbs1Dirty = getLabels(label11+dirty1, value11, dirty2+label12, value12) promLbs1 = getPromLabels(label11, value11, label12, value12) @@ -67,10 +73,11 @@ var ( ns1 = "test_ns" name1 = "valid_single_int_point" - monotonicInt64Comb = 0 - histogramComb = 2 - summaryComb = 3 - validCombinations = []combination{ + monotonicInt64Comb = 0 + monotonicDoubleComb = 1 + histogramComb = 2 + summaryComb = 3 + validCombinations = []combination{ {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_CUMULATIVE}, {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_CUMULATIVE}, {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_CUMULATIVE}, @@ -131,6 +138,24 @@ func getDescriptor(name string, i int, comb []combination) *otlp.MetricDescripto } } +func getIntDataPoint(labels []*commonpb.StringKeyValue, value int64, ts uint64) *otlp.Int64DataPoint { + return &otlp.Int64DataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + +func getDoubleDataPoint(labels []*commonpb.StringKeyValue, value float64, ts uint64) *otlp.DoubleDataPoint { + return &otlp.DoubleDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + // Prometheus TimeSeries func getPromLabels(lbs ...string) []prompb.Label { pbLbs := prompb.Labels{ @@ -162,3 +187,54 @@ func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) *prompb.Time Samples: samples, } } + +//setCumulative is for creating the dataold.MetricData to test with +func setTemporality(metricsData *dataold.MetricData, temp otlp.MetricDescriptor_Temporality) { + for _, r := range dataold.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + m.MetricDescriptor.Temporality = temp + } + } + } +} + +//setDataPointToNil is for creating the dataold.MetricData to test with +func setDataPointToNil(metricsData *dataold.MetricData, dataField string) { + for _, r := range dataold.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + switch dataField { + case typeMonotonicInt64: + m.Int64DataPoints = nil + case typeMonotonicDouble: + m.DoubleDataPoints = nil + case typeHistogram: + m.HistogramDataPoints = nil + case typeSummary: + m.SummaryDataPoints = nil + } + } + } + } +} + +//setType is for creating the dataold.MetricData to test with +func setType(metricsData *dataold.MetricData, dataField string) { + for _, r := range dataold.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + switch dataField { + case typeMonotonicInt64: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_MONOTONIC_INT64 + case typeMonotonicDouble: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_MONOTONIC_DOUBLE + case typeHistogram: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_HISTOGRAM + case typeSummary: + m.GetMetricDescriptor().Type = otlp.MetricDescriptor_SUMMARY + } + } + } + } +} diff --git a/go.mod b/go.mod index cb94179acdd..36ad210e8e0 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/golang/protobuf v1.4.2 + github.com/golang/snappy v0.0.1 github.com/golangci/golangci-lint v1.30.0 github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5 github.com/google/go-cmp v0.5.1