Skip to content

Commit

Permalink
Support cumulative, delta, and pass-through exporters (#840)
Browse files Browse the repository at this point in the history
* Update Process()

* Checkpoint

* Add subtractor; fix test

* Fix all simple integrator tests

* Build the rest (checkpoint)

* Pass all but Prometheus tests

* Precommit pass

* Add aggregation.Kind argument to ExportKindFor

* Remove Subtractor support

* Remove dead test code

* Restore the Subtractor code

* Fix the tests

* Comments

* Add tests for MetricKind

* Add ChangeSign test

* Test ExportKind

* New file

* Rename ChangeSign

* Remove a TODO, add a TODO

* Remove Stateful remnants

* Typo

* Typo

* Test an invalid export kind

* Comments

* Lint

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <[email protected]>

Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
jmacd and MrAlias authored Jun 23, 2020
1 parent e5267a3 commit 0e2fdfc
Show file tree
Hide file tree
Showing 26 changed files with 971 additions and 330 deletions.
42 changes: 42 additions & 0 deletions api/metric/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,45 @@ const (
// UpDownSumObserverKind indicates a UpDownSumObserver instrument.
UpDownSumObserverKind
)

// Synchronous returns whether this is a synchronous kind of instrument.
func (k Kind) Synchronous() bool {
switch k {
case CounterKind, UpDownCounterKind, ValueRecorderKind:
return true
}
return false
}

// Asynchronous returns whether this is an asynchronous kind of instrument.
func (k Kind) Asynchronous() bool {
return !k.Synchronous()
}

// Adding returns whether this kind of instrument adds its inputs (as opposed to Grouping).
func (k Kind) Adding() bool {
switch k {
case CounterKind, UpDownCounterKind, SumObserverKind, UpDownSumObserverKind:
return true
}
return false
}

// Adding returns whether this kind of instrument groups its inputs (as opposed to Adding).
func (k Kind) Grouping() bool {
return !k.Adding()
}

// Monotonic returns whether this kind of instrument exposes a non-decreasing sum.
func (k Kind) Monotonic() bool {
switch k {
case CounterKind, SumObserverKind:
return true
}
return false
}

// Cumulative returns whether this kind of instrument receives precomputed sums.
func (k Kind) PrecomputedSum() bool {
return k.Adding() && k.Asynchronous()
}
110 changes: 110 additions & 0 deletions api/metric/kind_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric_test

import (
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/metric"
)

var (
syncKinds = []metric.Kind{
metric.ValueRecorderKind,
metric.CounterKind,
metric.UpDownCounterKind,
}
asyncKinds = []metric.Kind{
metric.ValueObserverKind,
metric.SumObserverKind,
metric.UpDownSumObserverKind,
}
addingKinds = []metric.Kind{
metric.CounterKind,
metric.UpDownCounterKind,
metric.SumObserverKind,
metric.UpDownSumObserverKind,
}
groupingKinds = []metric.Kind{
metric.ValueRecorderKind,
metric.ValueObserverKind,
}

monotonicKinds = []metric.Kind{
metric.CounterKind,
metric.SumObserverKind,
}

nonMonotonicKinds = []metric.Kind{
metric.UpDownCounterKind,
metric.UpDownSumObserverKind,
metric.ValueRecorderKind,
metric.ValueObserverKind,
}

precomputedSumKinds = []metric.Kind{
metric.SumObserverKind,
metric.UpDownSumObserverKind,
}

nonPrecomputedSumKinds = []metric.Kind{
metric.CounterKind,
metric.UpDownCounterKind,
metric.ValueRecorderKind,
metric.ValueObserverKind,
}
)

func TestSynchronous(t *testing.T) {
for _, k := range syncKinds {
require.True(t, k.Synchronous())
require.False(t, k.Asynchronous())
}
for _, k := range asyncKinds {
require.True(t, k.Asynchronous())
require.False(t, k.Synchronous())
}
}

func TestGrouping(t *testing.T) {
for _, k := range groupingKinds {
require.True(t, k.Grouping())
require.False(t, k.Adding())
}
for _, k := range addingKinds {
require.True(t, k.Adding())
require.False(t, k.Grouping())
}
}

func TestMonotonic(t *testing.T) {
for _, k := range monotonicKinds {
require.True(t, k.Monotonic())
}
for _, k := range nonMonotonicKinds {
require.False(t, k.Monotonic())
}
}

func TestPrecomputedSum(t *testing.T) {
for _, k := range precomputedSumKinds {
require.True(t, k.PrecomputedSum())
}
for _, k := range nonPrecomputedSumKinds {
require.False(t, k.PrecomputedSum())
}
}
15 changes: 15 additions & 0 deletions api/metric/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
// Float64NumberKind means that the Number stores float64.
Float64NumberKind
// Uint64NumberKind means that the Number stores uint64.
// TODO: This can be removed, it's not used.
Uint64NumberKind
)

Expand Down Expand Up @@ -107,6 +108,20 @@ func NewUint64Number(u uint64) Number {
return NewNumberFromRaw(internal.Uint64ToRaw(u))
}

// NewNumberSignChange returns a number with the same magnitude and
// the opposite sign. `kind` must describe the kind of number in `nn`.
//
// Does not change Uint64NumberKind values.
func NewNumberSignChange(kind NumberKind, nn Number) Number {
switch kind {
case Int64NumberKind:
return NewInt64Number(-nn.AsInt64())
case Float64NumberKind:
return NewFloat64Number(-nn.AsFloat64())
}
return nn
}

// - as x

// AsNumber gets the Number.
Expand Down
43 changes: 43 additions & 0 deletions api/metric/number_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metric

import (
"math"
"testing"
"unsafe"

Expand Down Expand Up @@ -170,3 +171,45 @@ func TestNumberAsInterface(t *testing.T) {
require.Equal(t, 11.11, (&f64).AsInterface(Float64NumberKind).(float64))
require.Equal(t, uint64(100), (&u64).AsInterface(Uint64NumberKind).(uint64))
}

func TestNumberSignChange(t *testing.T) {
t.Run("Int64", func(t *testing.T) {
posInt := NewInt64Number(10)
negInt := NewInt64Number(-10)

require.Equal(t, posInt, NewNumberSignChange(Int64NumberKind, negInt))
require.Equal(t, negInt, NewNumberSignChange(Int64NumberKind, posInt))
})

t.Run("Float64", func(t *testing.T) {
posFloat := NewFloat64Number(10)
negFloat := NewFloat64Number(-10)

require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
})

t.Run("Float64Zero", func(t *testing.T) {
posFloat := NewFloat64Number(0)
negFloat := NewFloat64Number(math.Copysign(0, -1))

require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
})

t.Run("Float64Inf", func(t *testing.T) {
posFloat := NewFloat64Number(math.Inf(+1))
negFloat := NewFloat64Number(math.Inf(-1))

require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
})

t.Run("Float64NaN", func(t *testing.T) {
posFloat := NewFloat64Number(math.NaN())
negFloat := NewFloat64Number(math.Copysign(math.NaN(), -1))

require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
})
}
1 change: 0 additions & 1 deletion example/otlp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func initProvider() (*otlp.Exporter, *push.Controller) {
pusher := push.New(
simple.NewWithExactDistribution(),
exp,
push.WithStateful(true),
push.WithPeriod(2*time.Second),
)

Expand Down
36 changes: 17 additions & 19 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import (
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

// Exporter is an implementation of metric.Exporter that sends metrics to
// Prometheus.
//
// This exporter supports Prometheus pulls, as such it does not
// implement the export.Exporter interface.
// Exporter supports Prometheus pulls. It does not implement the
// sdk/export/metric.Exporter interface--instead it creates a pull
// controller and reads the latest checkpointed data on-scrape.
type Exporter struct {
handler http.Handler

Expand Down Expand Up @@ -144,20 +142,11 @@ func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error
func (e *Exporter) SetController(config Config, options ...pull.Option) {
e.lock.Lock()
defer e.lock.Unlock()
// Prometheus uses a stateful pull controller since instruments are
// cumulative and should not be reset after each collection interval.
//
// Prometheus uses this approach to be resilient to scrape failures.
// If a Prometheus server tries to scrape metrics from a host and fails for some reason,
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
//
// TODO: Prometheus supports "Gauge Histogram" which are
// expressed as delta histograms.

e.controller = pull.New(
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
append(options, pull.WithStateful(true))...,
e,
options...,
)
}

Expand All @@ -173,6 +162,15 @@ func (e *Exporter) Controller() *pull.Controller {
return e.controller
}

func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
// NOTE: Summary values should use Delta aggregation, then be
// combined into a sliding window, see the TODO below.
// NOTE: Prometheus also supports a "GaugeDelta" exposition format,
// which is expressed as a delta histogram. Need to understand if this
// should be a default behavior for ValueRecorder/ValueObserver.
return export.CumulativeExporter
}

func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
e.handler.ServeHTTP(w, r)
}
Expand All @@ -188,7 +186,7 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()

_ = c.exp.Controller().ForEach(func(record export.Record) error {
_ = c.exp.Controller().ForEach(c.exp, func(record export.Record) error {
var labelKeys []string
mergeLabels(record, &labelKeys, nil)
ch <- c.toDesc(record, labelKeys)
Expand All @@ -209,7 +207,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
global.Handle(err)
}

err := ctrl.ForEach(func(record export.Record) error {
err := ctrl.ForEach(c.exp, func(record export.Record) error {
agg := record.Aggregation()
numberKind := record.Descriptor().NumberKind()

Expand Down
13 changes: 7 additions & 6 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"

"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
Expand Down Expand Up @@ -132,9 +132,6 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller
// NewExportPipeline sets up a complete export pipeline with the
// recommended setup, chaining a NewRawExporter into the recommended
// selectors and integrators.
//
// The pipeline is configured with a stateful integrator unless the
// push.WithStateful(false) option is used.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) {
exporter, err := NewRawExporter(config)
if err != nil {
Expand All @@ -143,21 +140,25 @@ func NewExportPipeline(config Config, options ...push.Option) (*push.Controller,
pusher := push.New(
simple.NewWithExactDistribution(),
exporter,
append([]push.Option{push.WithStateful(true)}, options...)...,
options...,
)
pusher.Start()

return pusher, nil
}

func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
return export.PassThroughExporter
}

func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
var aggError error
var batch expoBatch
if !e.config.DoNotPrintTime {
ts := time.Now()
batch.Timestamp = &ts
}
aggError = checkpointSet.ForEach(func(record export.Record) error {
aggError = checkpointSet.ForEach(e, func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregation()
kind := desc.NumberKind()
Expand Down
4 changes: 3 additions & 1 deletion exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return newAgg, true
}

func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
// ForEach does not use ExportKindSelected: use a real Integrator to
// test ExportKind functionality.
func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Record) error) error {
for _, r := range p.updates {
if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) {
return err
Expand Down
Loading

0 comments on commit 0e2fdfc

Please sign in to comment.