Skip to content

Commit

Permalink
Remove support to carry in the metrics pipeline for OC data (open-tel…
Browse files Browse the repository at this point in the history
…emetry#1726)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Sep 2, 2020
1 parent 889948e commit 95389af
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 215 deletions.
73 changes: 3 additions & 70 deletions consumer/pdatautil/pdatautil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
package pdatautil

import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
googleproto "google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
Expand All @@ -32,11 +27,8 @@ import (
//
// This is a temporary function that will be removed when the new internal pdata.Metrics will be finalized.
func MetricsToMetricsData(md pdata.Metrics) []consumerdata.MetricsData {
if cmd, ok := md.InternalOpaque.([]consumerdata.MetricsData); ok {
return cmd
}
if ims, ok := md.InternalOpaque.(data.MetricData); ok {
return internaldata.MetricsToOC(ims)
if imd, ok := md.InternalOpaque.(data.MetricData); ok {
return internaldata.MetricsToOC(imd)
}
panic("Unsupported metrics type.")
}
Expand All @@ -45,7 +37,7 @@ func MetricsToMetricsData(md pdata.Metrics) []consumerdata.MetricsData {
//
// This is a temporary function that will be removed when the new internal pdata.Metrics will be finalized.
func MetricsFromMetricsData(ocmds []consumerdata.MetricsData) pdata.Metrics {
return pdata.Metrics{InternalOpaque: ocmds}
return MetricsFromInternalMetrics(internaldata.OCSliceToMetrics(ocmds))
}

// MetricsToInternalMetrics returns the `data.MetricData` representation of the `pdata.Metrics`.
Expand All @@ -55,9 +47,6 @@ func MetricsToInternalMetrics(md pdata.Metrics) data.MetricData {
if ims, ok := md.InternalOpaque.(data.MetricData); ok {
return ims
}
if cmd, ok := md.InternalOpaque.([]consumerdata.MetricsData); ok {
return internaldata.OCSliceToMetrics(cmd)
}
panic("Unsupported metrics type.")
}

Expand All @@ -75,80 +64,24 @@ func CloneMetrics(md pdata.Metrics) pdata.Metrics {
if ims, ok := md.InternalOpaque.(data.MetricData); ok {
return pdata.Metrics{InternalOpaque: ims.Clone()}
}
if ocmds, ok := md.InternalOpaque.([]consumerdata.MetricsData); ok {
clone := make([]consumerdata.MetricsData, 0, len(ocmds))
for _, ocmd := range ocmds {
clone = append(clone, cloneMetricsData(ocmd))
}
return pdata.Metrics{InternalOpaque: clone}
}
panic("Unsupported metrics type.")
}

func MetricCount(md pdata.Metrics) int {
if ims, ok := md.InternalOpaque.(data.MetricData); ok {
return ims.MetricCount()
}
if ocmds, ok := md.InternalOpaque.([]consumerdata.MetricsData); ok {
metricCount := 0
for _, ocmd := range ocmds {
metricCount += len(ocmd.Metrics)
}
return metricCount
}
panic("Unsupported metrics type.")
}

func MetricAndDataPointCount(md pdata.Metrics) (int, int) {
if ims, ok := md.InternalOpaque.(data.MetricData); ok {
return ims.MetricAndDataPointCount()
}
if ocmds, ok := md.InternalOpaque.([]consumerdata.MetricsData); ok {
metricCount := 0
dataPointCount := 0
for _, ocmd := range ocmds {
mc, dpc := TimeseriesAndPointCount(ocmd)
metricCount += mc
dataPointCount += dpc
}
return metricCount, dataPointCount
}
panic("Unsupported metrics type.")
}

func MetricPointCount(md pdata.Metrics) int {
_, points := MetricAndDataPointCount(md)
return points
}

func cloneMetricsData(md consumerdata.MetricsData) consumerdata.MetricsData {
clone := consumerdata.MetricsData{
Node: googleproto.Clone(md.Node).(*commonpb.Node),
Resource: googleproto.Clone(md.Resource).(*resourcepb.Resource),
}

if md.Metrics != nil {
clone.Metrics = make([]*metricspb.Metric, 0, len(md.Metrics))

for _, metric := range md.Metrics {
metricClone := googleproto.Clone(metric).(*metricspb.Metric)
clone.Metrics = append(clone.Metrics, metricClone)
}
}

return clone
}

// TimeseriesAndPointCount copied from exporterhelper.measureMetricsExport
func TimeseriesAndPointCount(md consumerdata.MetricsData) (int, int) {
numTimeSeries := 0
numPoints := 0
for _, metric := range md.Metrics {
tss := metric.GetTimeseries()
numTimeSeries += len(metric.GetTimeseries())
for _, ts := range tss {
numPoints += len(ts.GetPoints())
}
}
return numTimeSeries, numPoints
}
4 changes: 4 additions & 0 deletions consumer/pdatautil/pdatautil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func TestMetricAndDataPointCount(t *testing.T) {
{
Metrics: []*ocmetrics.Metric{
{
MetricDescriptor: &ocmetrics.MetricDescriptor{
Name: "gauge",
Type: ocmetrics.MetricDescriptor_GAUGE_INT64,
},
Timeseries: []*ocmetrics.TimeSeries{
{
Points: []*ocmetrics.Point{
Expand Down
18 changes: 6 additions & 12 deletions exporter/prometheusexporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ func metricBuilder(delta int64) []*metricspb.Metric {
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
},
LabelKeys: []*metricspb.LabelKey{{Key: "os"}, {Key: "arch"}},
},
Timeseries: []*metricspb.TimeSeries{
{
Expand All @@ -144,8 +141,8 @@ func metricBuilder(delta int64) []*metricspb.Metric {
Nanos: 100000090,
},
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
{Value: "windows", HasValue: true},
{Value: "x86", HasValue: true},
},
Points: []*metricspb.Point{
{
Expand All @@ -167,10 +164,7 @@ func metricBuilder(delta int64) []*metricspb.Metric {
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
},
LabelKeys: []*metricspb.LabelKey{{Key: "os"}, {Key: "arch"}},
},
Timeseries: []*metricspb.TimeSeries{
{
Expand All @@ -179,8 +173,8 @@ func metricBuilder(delta int64) []*metricspb.Metric {
Nanos: 100000090,
},
LabelValues: []*metricspb.LabelValue{
{Value: "linux"},
{Value: "x86"},
{Value: "linux", HasValue: true},
{Value: "x86", HasValue: true},
},
Points: []*metricspb.Point{
{
Expand Down
22 changes: 17 additions & 5 deletions processor/filterprocessor/filter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package filterprocessor
import (
"context"
"testing"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -146,7 +148,6 @@ var (
},
inMN: [][]*metricspb.Metric{nil, metricsWithName(inMetricNames), {}},
outMN: [][]string{
{},
{
"full_name_match",
"prefix/test/match",
Expand All @@ -160,7 +161,6 @@ var (
"full/name/match",
"full_name_match",
},
{},
},
},
{
Expand Down Expand Up @@ -217,9 +217,7 @@ func TestFilterMetricProcessor(t *testing.T) {
Metrics: metrics,
}
}
cErr := fmp.ConsumeMetrics(
context.Background(),
pdatautil.MetricsFromMetricsData(mds))
cErr := fmp.ConsumeMetrics(context.Background(), pdatautil.MetricsFromMetricsData(mds))
assert.Nil(t, cErr)
got := next.AllMetrics()

Expand Down Expand Up @@ -316,10 +314,24 @@ func BenchmarkFilter_MetricNames(b *testing.B) {

func metricsWithName(names []string) []*metricspb.Metric {
ret := make([]*metricspb.Metric, len(names))
now := time.Now()
for i, name := range names {
ret[i] = &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: name,
Type: metricspb.MetricDescriptor_GAUGE_INT64,
},
Timeseries: []*metricspb.TimeSeries{
{
Points: []*metricspb.Point{
{
Timestamp: timestamppb.New(now.Add(10 * time.Second)),
Value: &metricspb.Point_Int64Value{
Int64Value: int64(123),
},
},
},
},
},
}
}
Expand Down
3 changes: 1 addition & 2 deletions receiver/opencensusreceiver/ocmetrics/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ func ocReceiverOnGRPCServer(t *testing.T, sr consumer.MetricsConsumer) (int, fun

func makeMetric(val int) *metricspb.Metric {
key := &metricspb.LabelKey{
Key: fmt.Sprintf("%s%d", "key", val),
Description: "label key",
Key: fmt.Sprintf("%s%d", "key", val),
}
value := &metricspb.LabelValue{
Value: fmt.Sprintf("%s%d", "value", val),
Expand Down
18 changes: 12 additions & 6 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -69,6 +70,7 @@ type transaction struct {
receiverName string
ms MetadataService
node *commonpb.Node
resource *resourcepb.Resource
metricBuilder *metricBuilder
logger *zap.Logger
}
Expand Down Expand Up @@ -134,7 +136,7 @@ func (tr *transaction) initTransaction(ls labels.Labels) error {
tr.job = job
tr.instance = instance
}
tr.node = createNode(job, instance, mc.SharedLabels().Get(model.SchemeLabel))
tr.node, tr.resource = createNodeAndResource(job, instance, mc.SharedLabels().Get(model.SchemeLabel))
tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger)
tr.isNew = false
return nil
Expand Down Expand Up @@ -181,8 +183,9 @@ func (tr *transaction) Commit() error {
numPoints := 0
if len(metrics) > 0 {
md := consumerdata.MetricsData{
Node: tr.node,
Metrics: metrics,
Node: tr.node,
Resource: tr.resource,
Metrics: metrics,
}
numTimeseries, numPoints = obsreport.CountMetricPoints(md)
err = tr.sink.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md}))
Expand Down Expand Up @@ -219,20 +222,23 @@ func timestampFromFloat64(ts float64) *timestamppb.Timestamp {
}
}

func createNode(job, instance, scheme string) *commonpb.Node {
func createNodeAndResource(job, instance, scheme string) (*commonpb.Node, *resourcepb.Resource) {
splitted := strings.Split(instance, ":")
host, port := splitted[0], "80"
if len(splitted) >= 2 {
port = splitted[1]
}
return &commonpb.Node{
node := &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: job},
Identifier: &commonpb.ProcessIdentifier{
HostName: host,
},
Attributes: map[string]string{
}
resource := &resourcepb.Resource{
Labels: map[string]string{
portAttr: port,
schemeAttr: scheme,
},
}
return node, resource
}
19 changes: 10 additions & 9 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@ func Test_transaction(t *testing.T) {
Value: "localhost:8080",
},
)

ms := &mService{
sm: &mockScrapeManager{targets: map[string][]*scrape.Target{
"test": {
scrape.NewTarget(processedLabels, discoveredLabels, nil),
},
"test": {scrape.NewTarget(processedLabels, discoveredLabels, nil)},
}},
}

Expand Down Expand Up @@ -112,7 +111,7 @@ func Test_transaction(t *testing.T) {
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
expected := createNode("test", "localhost:8080", "http")
expectedNode, expectedResource := createNodeAndResource("test", "localhost:8080", "http")
mds := sink.AllMetrics()
if len(mds) != 1 {
t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics())
Expand All @@ -121,13 +120,15 @@ func Test_transaction(t *testing.T) {
if len(ocmds) != 1 {
t.Fatalf("wanted one batch per node, got %v\n", sink.AllMetrics())
}
if !proto.Equal(ocmds[0].Node, expected) {
t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expected)
if !proto.Equal(ocmds[0].Node, expectedNode) {
t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expectedNode)
}

if len(ocmds[0].Metrics) != 1 {
t.Errorf("expecting one metrics, but got %v\n", len(ocmds[0].Metrics))
if !proto.Equal(ocmds[0].Resource, expectedResource) {
t.Errorf("generated resource %v and expected resource %v is different\n", ocmds[0].Resource, expectedResource)
}

// TODO: re-enable this when handle unspecified OC type
// assert.Len(t, ocmds[0].Metrics, 1)
})

t.Run("Drop NaN value", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 95389af

Please sign in to comment.