Skip to content

Commit

Permalink
receiver/prometheus: roundtrip Prometheus->Pdata direct conversion
Browse files Browse the repository at this point in the history
Wire up and use the direct Prometheus->Pdata conversion end to end.
With this change the receiver will no longer need OpenCensus.

This change will involve more follow-ups that just migrate over the tests,
because we don't want a super bloated/massive PR.

Fixes open-telemetry#3691
Depends on PR open-telemetry#3694
Depends on PR open-telemetry#3695
  • Loading branch information
odeke-em committed Jul 30, 2021
1 parent 3be6185 commit 9f099ab
Show file tree
Hide file tree
Showing 12 changed files with 967 additions and 184 deletions.
155 changes: 0 additions & 155 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,161 +194,6 @@ func Test_startTimeMetricMatch(t *testing.T) {
runBuilderStartTimeTests(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime)
}

func Test_metricBuilder_counters(t *testing.T) {
tests := []buildTestData{
{
name: "single-item",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("counter_test", 100, "foo", "bar"),
},
},
},
wants: [][]*metricspb.Metric{
{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "counter_test",
Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}},
},
},
},
},
},
},
},
{
name: "two-items",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("counter_test", 150, "foo", "bar"),
createDataPoint("counter_test", 25, "foo", "other"),
},
},
},
wants: [][]*metricspb.Metric{
{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "counter_test",
Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 150.0}},
},
},
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "other", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 25.0}},
},
},
},
},
},
},
},
{
name: "two-metrics",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("counter_test", 150, "foo", "bar"),
createDataPoint("counter_test", 25, "foo", "other"),
createDataPoint("counter_test2", 100, "foo", "bar"),
},
},
},
wants: [][]*metricspb.Metric{
{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "counter_test",
Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 150.0}},
},
},
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "other", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 25.0}},
},
},
},
},
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "counter_test2",
Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}},
},
},
},
},
},
},
},
{
name: "metrics-with-poor-names",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
createDataPoint("poor_name_count", 100, "foo", "bar"),
},
},
},
wants: [][]*metricspb.Metric{
{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "poor_name_count",
Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: timestampFromMs(startTs),
LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}},
Points: []*metricspb.Point{
{Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}},
},
},
},
},
},
},
},
}

runBuilderTests(t, tests)
}

func Test_metricBuilder_gauges(t *testing.T) {
tests := []buildTestData{
{
Expand Down
26 changes: 14 additions & 12 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type OcaStore struct {
running int32 // access atomically
sink consumer.Metrics
mc *metadataService
jobsMap *JobsMap
jobsMap *JobsMapPdata
useStartTimeMetric bool
startTimeMetricRegex string
receiverID config.ComponentID
Expand All @@ -60,7 +60,7 @@ func NewOcaStore(
ctx context.Context,
sink consumer.Metrics,
logger *zap.Logger,
jobsMap *JobsMap,
jobsMap *JobsMapPdata,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
Expand Down Expand Up @@ -93,17 +93,19 @@ func (o *OcaStore) Appender(context.Context) storage.Appender {
// Firstly prepare the stalenessStore for a new scrape cyle.
o.stalenessStore.refresh()

return newTransaction(
return newTransactionPdata(
o.ctx,
o.jobsMap,
o.useStartTimeMetric,
o.startTimeMetricRegex,
o.receiverID,
o.mc,
o.sink,
o.externalLabels,
o.logger,
o.stalenessStore,
&txConfig{
jobsMap: o.jobsMap,
useStartTimeMetric: o.useStartTimeMetric,
startTimeMetricRegex: o.startTimeMetricRegex,
receiverID: o.receiverID,
ms: o.mc,
sink: o.sink,
externalLabels: o.externalLabels,
logger: o.logger,
stalenessStore: o.stalenessStore,
},
)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
Expand Down
16 changes: 15 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata {

func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) {
metric := pdata.NewMetric()
metric.SetDataType(mf.mtype)
metric.SetName(mf.name)

pointCount := 0

switch mf.mtype {
Expand All @@ -297,7 +300,18 @@ func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int
}
pointCount = sdpL.Len()

default:
case pdata.MetricDataTypeSum:
sum := metric.Sum()
sdpL := sum.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toNumberDataPoint(mf.labelKeysOrdered, &sdpL) {
mf.droppedTimeseries++
}
}
pointCount = sdpL.Len()

default: // Everything else should be set to a Gauge.
metric.SetDataType(pdata.MetricDataTypeGauge)
gauge := metric.Gauge()
gdpL := gauge.DataPoints()
for _, mg := range mf.getGroups() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func TestIsCumulativeEquivalence(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily)
mfp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata)
assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative")
msg := fmt.Sprintf("\n%q::mf.isCumulativeType()=%t\n%q::mp.isCumulativeType()=%t\n",
mf.mtype, mf.isCumulativeType(),
mfp.mtype, mfp.isCumulativeTypePdata())
assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative "+msg)
assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily")
assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily")
})
Expand Down
Loading

0 comments on commit 9f099ab

Please sign in to comment.