Skip to content

Commit

Permalink
Remove internaldata.MetricsData, same APIs as for traces (#3156)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored May 13, 2021
1 parent 48f034a commit 40c439a
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 274 deletions.
24 changes: 10 additions & 14 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,19 @@ func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) erro
}
}

ocmds := internaldata.MetricsToOC(md)
for _, ocmd := range ocmds {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
ocReq := agentmetricspb.ExportMetricsServiceRequest{}
ocReq.Node, ocReq.Resource, ocReq.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))

// This is a hack because OC protocol expects a Node for the initial message.
node := ocmd.Node
if node == nil {
node = &commonpb.Node{}
}
resource := ocmd.Resource
if resource == nil {
resource = &resourcepb.Resource{}
if ocReq.Node == nil {
ocReq.Node = &commonpb.Node{}
}
req := &agentmetricspb.ExportMetricsServiceRequest{
Metrics: ocmd.Metrics,
Resource: resource,
Node: node,
if ocReq.Resource == nil {
ocReq.Resource = &resourcepb.Resource{}
}
if err := mClient.msec.Send(req); err != nil {
if err := mClient.msec.Send(&ocReq); err != nil {
// Error received, cancel the context used to create the RPC to free all resources,
// put back nil to keep the number of workers constant.
mClient.cancel()
Expand Down
8 changes: 4 additions & 4 deletions exporter/prometheusexporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func TestPrometheusExporter_endToEnd(t *testing.T) {
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))

// Should accumulate multiple metrics
md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(128, "metric_1_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(128, "metric_1_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

for delta := 0; delta <= 20; delta += 10 {
md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(int64(delta), "metric_2_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(int64(delta), "metric_2_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

res, err1 := http.Get("http://localhost:7777/metrics")
Expand Down Expand Up @@ -208,11 +208,11 @@ func TestPrometheusExporter_endToEndWithTimestamps(t *testing.T) {

// Should accumulate multiple metrics

md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(128, "metric_1_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(128, "metric_1_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

for delta := 0; delta <= 20; delta += 10 {
md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(int64(delta), "metric_2_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(int64(delta), "metric_2_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

res, err1 := http.Get("http://localhost:7777/metrics")
Expand Down
19 changes: 6 additions & 13 deletions receiver/opencensusreceiver/ocmetrics/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"

"go.opentelemetry.io/collector/component/componenterror"
Expand Down Expand Up @@ -116,36 +117,28 @@ func (ocr *Receiver) processReceivedMsg(
resource = recv.Resource
}

md := internaldata.MetricsData{
Node: lastNonNilNode,
Resource: resource,
Metrics: recv.Metrics,
}

err := ocr.sendToNextConsumer(longLivedRPCCtx, md)
err := ocr.sendToNextConsumer(longLivedRPCCtx, lastNonNilNode, resource, recv.Metrics)
return lastNonNilNode, resource, err
}

func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, md internaldata.MetricsData) error {
func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics []*ocmetrics.Metric) error {
ctx := obsreport.StartMetricsReceiveOp(
longLivedRPCCtx,
ocr.id,
receiverTransport,
obsreport.WithLongLivedCtx())

numTimeSeries := 0
numPoints := 0
// Count number of time series and data points.
for _, metric := range md.Metrics {
numTimeSeries += len(metric.Timeseries)
for _, metric := range metrics {
for _, ts := range metric.GetTimeseries() {
numPoints += len(ts.GetPoints())
}
}

var consumerErr error
if len(md.Metrics) > 0 {
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(md))
if len(metrics) > 0 {
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(node, resource, metrics))
}

obsreport.EndMetricsReceiveOp(
Expand Down
14 changes: 8 additions & 6 deletions receiver/opencensusreceiver/ocmetrics/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ func TestExportMultiplexing(t *testing.T) {
// Examination time!
resultsMapping := make(map[string][]*metricspb.Metric)
for _, md := range metricSink.AllMetrics() {
ocmds := internaldata.MetricsToOC(md)
for _, ocmd := range ocmds {
resultsMapping[nodeToKey(ocmd.Node)] = append(resultsMapping[nodeToKey(ocmd.Node)], ocmd.Metrics...)
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
node, _, metrics := internaldata.ResourceMetricsToOC(rms.At(i))
resultsMapping[nodeToKey(node)] = append(resultsMapping[nodeToKey(node)], metrics...)
}
}

Expand Down Expand Up @@ -292,9 +293,10 @@ func TestExportProtocolConformation_metricsInFirstMessage(t *testing.T) {
// Examination time!
resultsMapping := make(map[string][]*metricspb.Metric)
for _, md := range metricSink.AllMetrics() {
ocmds := internaldata.MetricsToOC(md)
for _, ocmd := range ocmds {
resultsMapping[nodeToKey(ocmd.Node)] = append(resultsMapping[nodeToKey(ocmd.Node)], ocmd.Metrics...)
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
node, _, metrics := internaldata.ResourceMetricsToOC(rms.At(i))
resultsMapping[nodeToKey(node)] = append(resultsMapping[nodeToKey(node)], metrics...)
}
}

Expand Down
6 changes: 1 addition & 5 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,7 @@ func (tr *transaction) Commit() error {

numPoints := 0
if len(metrics) > 0 {
md := internaldata.OCToMetrics(internaldata.MetricsData{
Node: tr.node,
Resource: tr.resource,
Metrics: metrics,
})
md := internaldata.OCToMetrics(tr.node, tr.resource, metrics)
_, numPoints = md.MetricAndDataPointCount()
err = tr.sink.ConsumeMetrics(ctx, md)
}
Expand Down
12 changes: 9 additions & 3 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"testing"
"time"

agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -117,10 +119,14 @@ func Test_transaction(t *testing.T) {
if len(mds) != 1 {
t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics())
}
ocmds := internaldata.MetricsToOC(mds[0])
if len(ocmds) != 1 {
t.Fatalf("wanted one batch per node, got %v\n", sink.AllMetrics())
var ocmds []*agentmetricspb.ExportMetricsServiceRequest
rms := mds[0].ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
ocmd := &agentmetricspb.ExportMetricsServiceRequest{}
ocmd.Node, ocmd.Resource, ocmd.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))
ocmds = append(ocmds, ocmd)
}
require.Len(t, ocmds, 1)
if !proto.Equal(ocmds[0].Node, expectedNode) {
t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expectedNode)
}
Expand Down
Loading

0 comments on commit 40c439a

Please sign in to comment.