Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LastValue by default for ValueObserver instruments #1165

Merged
merged 21 commits into from
Sep 24, 2020
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Don't consider unset environment variable for resource detection to be an error. (#1170)
- Rename `go.opentelemetry.io/otel/api/metric.ConfigureInstrument` to `NewInstrumentConfig` and
`go.opentelemetry.io/otel/api/metric.ConfigureMeter` to `NewMeterConfig`.
- ValueObserver instruments use LastValue aggregator by default. (#1165)
- OTLP Metric exporter supports LastValue aggregation. (#1165)

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
// ),
// )
tracerProvider := sdktrace.NewProvider(sdktrace.WithBatcher(exporter))
pusher := push.New(simple.NewWithExactDistribution(), exporter)
pusher := push.New(simple.NewWithInexpensiveDistribution(), exporter)
pusher.Start()
metricProvider := pusher.Provider()

Expand Down
74 changes: 54 additions & 20 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"
"sync"
"time"

commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
Expand All @@ -40,6 +41,11 @@ var (
// aggregator is attempted.
ErrUnimplementedAgg = errors.New("unimplemented aggregator")

// ErrIncompatibleAggregation is returned when
// aggregation.Kind implies an interface conversion that has
// failed
ErrIncompatibleAggregation = errors.New("incompatible aggregation type")

// ErrUnknownValueType is returned when a transformation of an unknown value
// is attempted.
ErrUnknownValueType = errors.New("invalid value type")
Expand All @@ -60,6 +66,14 @@ type result struct {
Err error
}

// toNanos returns the number of nanoseconds since the UNIX epoch.
func toNanos(t time.Time) uint64 {
if t.IsZero() {
return 0
}
return uint64(t.UnixNano())
}

// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
Expand Down Expand Up @@ -234,24 +248,44 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e
// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
// error is returned if the Record Aggregator is not supported.
func Record(r export.Record) (*metricpb.Metric, error) {
switch a := r.Aggregation().(type) {
case aggregation.MinMaxSumCount:
return minMaxSumCount(r, a)
case aggregation.Sum:
return sum(r, a)
agg := r.Aggregation()
switch agg.Kind() {
case aggregation.MinMaxSumCountKind:
return minMaxSumCount(r, agg.(aggregation.MinMaxSumCount))

case aggregation.SumKind:
s, ok := agg.(aggregation.Sum)

if !ok {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAggregation, agg)
}
sum, err := s.Sum()
if err != nil {
return nil, err
}
return scalar(r, sum, r.StartTime(), r.EndTime())

case aggregation.LastValueKind:
lv, ok := agg.(aggregation.LastValue)
if !ok {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAggregation, agg)
}
value, tm, err := lv.LastValue()
if err != nil {
return nil, err
}
return scalar(r, value, time.Time{}, tm)

default:
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg)
}
}

// sum transforms a Sum Aggregator into an OTLP Metric.
func sum(record export.Record, a aggregation.Sum) (*metricpb.Metric, error) {
// scalar transforms a Sum or LastValue Aggregator into an OTLP Metric.
// For LastValue (Gauge), use start==time.Time{}.
func scalar(record export.Record, num metric.Number, start, end time.Time) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
sum, err := a.Sum()
if err != nil {
return nil, err
}

m := &metricpb.Metric{
MetricDescriptor: &metricpb.MetricDescriptor{
Expand All @@ -266,20 +300,20 @@ func sum(record export.Record, a aggregation.Sum) (*metricpb.Metric, error) {
m.MetricDescriptor.Type = metricpb.MetricDescriptor_INT64
m.Int64DataPoints = []*metricpb.Int64DataPoint{
{
Value: sum.CoerceToInt64(n),
Value: num.CoerceToInt64(n),
Labels: stringKeyValues(labels.Iter()),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
StartTimeUnixNano: toNanos(start),
TimeUnixNano: toNanos(end),
},
}
case metric.Float64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_DOUBLE
m.DoubleDataPoints = []*metricpb.DoubleDataPoint{
{
Value: sum.CoerceToFloat64(n),
Value: num.CoerceToFloat64(n),
Labels: stringKeyValues(labels.Iter()),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
StartTimeUnixNano: toNanos(start),
TimeUnixNano: toNanos(end),
},
}
default:
Expand Down Expand Up @@ -339,8 +373,8 @@ func minMaxSumCount(record export.Record, a aggregation.MinMaxSumCount) (*metric
Value: max.CoerceToFloat64(numKind),
},
},
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
StartTimeUnixNano: toNanos(record.StartTime()),
TimeUnixNano: toNanos(record.EndTime()),
},
},
}, nil
Expand Down
54 changes: 49 additions & 5 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/export/metric/metrictest"
lvAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestSumMetricDescriptor(t *testing.T) {
},
{
"sum-test-b",
metric.ValueRecorderKind, // This shouldn't change anything.
metric.ValueObserverKind, // This shouldn't change anything.
"test-b-description",
unit.Milliseconds,
metric.Float64NumberKind,
Expand All @@ -264,7 +265,7 @@ func TestSumMetricDescriptor(t *testing.T) {
labels := label.NewSet(test.labels...)
emptyAgg := &sumAgg.New(1)[0]
record := export.NewRecord(&desc, &labels, nil, emptyAgg, intervalStart, intervalEnd)
got, err := sum(record, emptyAgg)
got, err := scalar(record, 0, time.Time{}, time.Time{})
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
Expand All @@ -278,7 +279,14 @@ func TestSumInt64DataPoints(t *testing.T) {
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
sum, ok := ckpt.(aggregation.Sum)
if !ok {
t.Errorf("ckpt is not an aggregation.Sum: %T", ckpt)
}
jmacd marked this conversation as resolved.
Show resolved Hide resolved
value, err := sum.Sum()
require.NoError(t, err)

if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
Expand All @@ -297,7 +305,14 @@ func TestSumFloat64DataPoints(t *testing.T) {
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
sum, ok := ckpt.(aggregation.Sum)
if !ok {
t.Errorf("ckpt is not an aggregation.Sum: %T", ckpt)
}
jmacd marked this conversation as resolved.
Show resolved Hide resolved
value, err := sum.Sum()
require.NoError(t, err)

if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{
Value: 1,
Expand All @@ -309,12 +324,41 @@ func TestSumFloat64DataPoints(t *testing.T) {
}
}

func TestLastValueInt64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.Int64NumberKind)
labels := label.NewSet()
s, ckpt := metrictest.Unslice2(lvAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.Number(100), &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
sum, ok := ckpt.(aggregation.LastValue)
if !ok {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("ckpt is not an aggregation.LastValue: %T", ckpt)
}
value, timestamp, err := sum.LastValue()
require.NoError(t, err)

if m, err := scalar(record, value, time.Time{}, timestamp); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{
Value: 100,
StartTimeUnixNano: 0,
TimeUnixNano: uint64(timestamp.UnixNano()),
}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints)
}
}

func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.NumberKind(-1))
labels := label.NewSet()
s := &sumAgg.New(1)[0]
record := export.NewRecord(&desc, &labels, nil, s, intervalStart, intervalEnd)
_, err := sum(record, s)
value, err := s.Sum()
require.NoError(t, err)

_, err = scalar(record, value, record.StartTime(), record.EndTime())
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)
Expand Down
18 changes: 10 additions & 8 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
span.End()
}

selector := simple.NewWithExactDistribution()
selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, metricsdk.PassThroughExporter)
pusher := push.New(processor, exp)
pusher.Start()
Expand All @@ -144,6 +144,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
"test-float64-valueobserver": {metric.ValueObserverKind, metricapi.Float64NumberKind, 3},
}
for name, data := range instruments {
data := data
switch data.iKind {
case metric.CounterKind:
switch data.nKind {
Expand All @@ -166,10 +167,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
case metric.ValueObserverKind:
switch data.nKind {
case metricapi.Int64NumberKind:
callback := func(v int64) metricapi.Int64ObserverFunc {
return metricapi.Int64ObserverFunc(func(_ context.Context, result metricapi.Int64ObserverResult) { result.Observe(v, labels...) })
}(data.val)
metricapi.Must(meter).NewInt64ValueObserver(name, callback)
metricapi.Must(meter).NewInt64ValueObserver(name,
func(_ context.Context, result metricapi.Int64ObserverResult) {
result.Observe(data.val, labels...)
},
)
case metricapi.Float64NumberKind:
callback := func(v float64) metricapi.Float64ObserverFunc {
return metricapi.Float64ObserverFunc(func(_ context.Context, result metricapi.Float64ObserverResult) { result.Observe(v, labels...) })
Expand Down Expand Up @@ -245,7 +247,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
seen[desc.Name] = struct{}{}

switch data.iKind {
case metric.CounterKind:
case metric.CounterKind, metric.ValueObserverKind:
switch data.nKind {
case metricapi.Int64NumberKind:
assert.Equal(t, metricpb.MetricDescriptor_INT64.String(), desc.GetType().String())
Expand All @@ -260,7 +262,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
case metric.ValueRecorderKind, metric.ValueObserverKind:
case metric.ValueRecorderKind:
assert.Equal(t, metricpb.MetricDescriptor_SUMMARY.String(), desc.GetType().String())
m.GetSummaryDataPoints()
if dp := m.GetSummaryDataPoints(); assert.Len(t, dp, 1) {
Expand Down Expand Up @@ -486,7 +488,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
span.SetAttributes(testKvs...)
span.End()

selector := simple.NewWithExactDistribution()
selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, metricsdk.PassThroughExporter)
pusher := push.New(processor, exp)
pusher.Start()
Expand Down
24 changes: 20 additions & 4 deletions sdk/metric/selector/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
Expand Down Expand Up @@ -86,9 +87,18 @@ func sumAggs(aggPtrs []*export.Aggregator) {
}
}

func lastValueAggs(aggPtrs []*export.Aggregator) {
aggs := lastvalue.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
}

func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
case metric.ValueObserverKind:
lastValueAggs(aggPtrs)
case metric.ValueRecorderKind:
aggs := minmaxsumcount.New(len(aggPtrs), descriptor)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
Expand All @@ -100,7 +110,9 @@ func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor, aggPtrs

func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
case metric.ValueObserverKind:
lastValueAggs(aggPtrs)
case metric.ValueRecorderKind:
aggs := ddsketch.New(len(aggPtrs), descriptor, s.config)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
Expand All @@ -112,7 +124,9 @@ func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...

func (selectorExact) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
case metric.ValueObserverKind:
lastValueAggs(aggPtrs)
case metric.ValueRecorderKind:
aggs := array.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
Expand All @@ -124,7 +138,9 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*ex

func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
case metric.ValueObserverKind:
lastValueAggs(aggPtrs)
case metric.ValueRecorderKind:
aggs := histogram.New(len(aggPtrs), descriptor, s.boundaries)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
Expand Down
Loading