Skip to content

Commit

Permalink
Translate time in parity with old code
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Aug 17, 2021
1 parent 34e6396 commit 3cc2119
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 141 deletions.
14 changes: 10 additions & 4 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
Expand Down Expand Up @@ -174,14 +175,19 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest
point.SetSum(mg.sum)
point.SetBucketCounts(bucketCounts)
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdata.Timestamp(mg.ts * 1e6)
tsNanos := timestampFromMs(mg.ts)
point.SetStartTimestamp(tsNanos)
point.SetTimestamp(tsNanos)
populateLabelValuesPdata(orderedLabelKeys, mg.ls, point.LabelsMap())

return true
}

func timestampFromMs(timeAtMs int64) pdata.Timestamp {
secs, ns := timeAtMs/1e3, (timeAtMs%1e3)*1e6
return pdata.TimestampFromTime(time.Unix(secs, ns))
}

func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdata.SummaryDataPointSlice) bool {
// expecting count to be provided, however, in the following two cases, they can be missed.
// 1. data is corrupted
Expand All @@ -205,7 +211,7 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdat
// observations and the corresponding sum is a sum of all observed values, thus the sum and count used
// at the global level of the metricspb.SummaryValue
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdata.Timestamp(mg.ts * 1e6)
tsNanos := timestampFromMs(mg.ts)
point.SetStartTimestamp(tsNanos)
point.SetTimestamp(tsNanos)
point.SetSum(mg.sum)
Expand All @@ -217,10 +223,10 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdat

func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *pdata.NumberDataPointSlice) bool {
var startTsNanos pdata.Timestamp
tsNanos := pdata.Timestamp(mg.ts * 1e6)
tsNanos := timestampFromMs(mg.ts)
// gauge/undefined types have no start time.
if mg.family.isCumulativeTypePdata() {
startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6)
startTsNanos = timestampFromMs(mg.intervalStartTimeMs)
}

point := dest.AppendEmpty()
Expand Down
23 changes: 20 additions & 3 deletions receiver/prometheusreceiver/internal/otlp_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"math"
"sync/atomic"
"time"

"go.uber.org/zap"

Expand Down Expand Up @@ -159,6 +160,7 @@ func (t *transactionPdata) Commit() error {
for _, sEntry := range staleLabels {
t.metricBuilder.AddDataPoint(sEntry.labels, sEntry.seenAtMs, stalenessSpecialValue)
}

t.startTimeMs = -1

ctx := t.obsrecv.StartMetricsOp(t.ctx)
Expand Down Expand Up @@ -195,28 +197,43 @@ func (t *transactionPdata) Rollback() error {
return nil
}

func timestampFromFloat64(ts float64) pdata.Timestamp {
secs := int64(ts)
nanos := int64((ts - float64(secs)) * 1e9)
return pdata.TimestampFromTime(time.Unix(secs, nanos))
}

func adjustStartTimestampPdata(startTime float64, metricsL *pdata.MetricSlice) {
startTimeTs := timestampFromFloat64(startTime)
for i := 0; i < metricsL.Len(); i++ {
metric := metricsL.At(i)
switch metric.DataType() {
case pdata.MetricDataTypeGauge, pdata.MetricDataTypeHistogram:
case pdata.MetricDataTypeGauge:
continue

case pdata.MetricDataTypeSum:
dataPoints := metric.Sum().DataPoints()
for i := 0; i < dataPoints.Len(); i++ {
dataPoint := dataPoints.At(i)
dataPoint.SetStartTimestamp(pdata.Timestamp(startTime))
dataPoint.SetStartTimestamp(startTimeTs)
}

case pdata.MetricDataTypeSummary:
dataPoints := metric.Summary().DataPoints()
for i := 0; i < dataPoints.Len(); i++ {
dataPoint := dataPoints.At(i)
dataPoint.SetStartTimestamp(pdata.Timestamp(startTime))
dataPoint.SetStartTimestamp(startTimeTs)
}

case pdata.MetricDataTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
for i := 0; i < dataPoints.Len(); i++ {
dataPoint := dataPoints.At(i)
dataPoint.SetStartTimestamp(startTimeTs)
}

default:
panic("Unknown type:: " + metric.DataType().String())
}
}
}
Expand Down
Loading

0 comments on commit 3cc2119

Please sign in to comment.