diff --git a/api/metric/kind.go b/api/metric/kind.go index 5020384c56a..9d4b453f3e5 100644 --- a/api/metric/kind.go +++ b/api/metric/kind.go @@ -35,3 +35,45 @@ const ( // UpDownSumObserverKind indicates a UpDownSumObserver instrument. UpDownSumObserverKind ) + +// Synchronous returns whether this is a synchronous kind of instrument. +func (k Kind) Synchronous() bool { + switch k { + case CounterKind, UpDownCounterKind, ValueRecorderKind: + return true + } + return false +} + +// Asynchronous returns whether this is an asynchronous kind of instrument. +func (k Kind) Asynchronous() bool { + return !k.Synchronous() +} + +// Adding returns whether this kind of instrument adds its inputs (as opposed to Grouping). +func (k Kind) Adding() bool { + switch k { + case CounterKind, UpDownCounterKind, SumObserverKind, UpDownSumObserverKind: + return true + } + return false +} + +// Adding returns whether this kind of instrument groups its inputs (as opposed to Adding). +func (k Kind) Grouping() bool { + return !k.Adding() +} + +// Monotonic returns whether this kind of instrument exposes a non-decreasing sum. +func (k Kind) Monotonic() bool { + switch k { + case CounterKind, SumObserverKind: + return true + } + return false +} + +// Cumulative returns whether this kind of instrument receives precomputed sums. +func (k Kind) PrecomputedSum() bool { + return k.Adding() && k.Asynchronous() +} diff --git a/api/metric/kind_test.go b/api/metric/kind_test.go new file mode 100644 index 00000000000..09542b4644f --- /dev/null +++ b/api/metric/kind_test.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/metric" +) + +var ( + syncKinds = []metric.Kind{ + metric.ValueRecorderKind, + metric.CounterKind, + metric.UpDownCounterKind, + } + asyncKinds = []metric.Kind{ + metric.ValueObserverKind, + metric.SumObserverKind, + metric.UpDownSumObserverKind, + } + addingKinds = []metric.Kind{ + metric.CounterKind, + metric.UpDownCounterKind, + metric.SumObserverKind, + metric.UpDownSumObserverKind, + } + groupingKinds = []metric.Kind{ + metric.ValueRecorderKind, + metric.ValueObserverKind, + } + + monotonicKinds = []metric.Kind{ + metric.CounterKind, + metric.SumObserverKind, + } + + nonMonotonicKinds = []metric.Kind{ + metric.UpDownCounterKind, + metric.UpDownSumObserverKind, + metric.ValueRecorderKind, + metric.ValueObserverKind, + } + + precomputedSumKinds = []metric.Kind{ + metric.SumObserverKind, + metric.UpDownSumObserverKind, + } + + nonPrecomputedSumKinds = []metric.Kind{ + metric.CounterKind, + metric.UpDownCounterKind, + metric.ValueRecorderKind, + metric.ValueObserverKind, + } +) + +func TestSynchronous(t *testing.T) { + for _, k := range syncKinds { + require.True(t, k.Synchronous()) + require.False(t, k.Asynchronous()) + } + for _, k := range asyncKinds { + require.True(t, k.Asynchronous()) + require.False(t, k.Synchronous()) + } +} + +func TestGrouping(t *testing.T) { + for _, k := range groupingKinds { + require.True(t, k.Grouping()) + require.False(t, k.Adding()) + } + for _, k := range addingKinds { + require.True(t, k.Adding()) + require.False(t, k.Grouping()) + } +} + +func TestMonotonic(t *testing.T) { + for _, k := range monotonicKinds { + require.True(t, k.Monotonic()) + } + for _, k := range nonMonotonicKinds { + require.False(t, k.Monotonic()) + } +} + +func TestPrecomputedSum(t *testing.T) { + for _, k := range precomputedSumKinds { + require.True(t, k.PrecomputedSum()) + } + for _, k := range nonPrecomputedSumKinds { + require.False(t, k.PrecomputedSum()) + } +} diff --git a/api/metric/number.go b/api/metric/number.go index 9fcffa79885..50349cf6945 100644 --- a/api/metric/number.go +++ b/api/metric/number.go @@ -33,6 +33,7 @@ const ( // Float64NumberKind means that the Number stores float64. Float64NumberKind // Uint64NumberKind means that the Number stores uint64. + // TODO: This can be removed, it's not used. Uint64NumberKind ) @@ -107,6 +108,20 @@ func NewUint64Number(u uint64) Number { return NewNumberFromRaw(internal.Uint64ToRaw(u)) } +// NewNumberSignChange returns a number with the same magnitude and +// the opposite sign. `kind` must describe the kind of number in `nn`. +// +// Does not change Uint64NumberKind values. +func NewNumberSignChange(kind NumberKind, nn Number) Number { + switch kind { + case Int64NumberKind: + return NewInt64Number(-nn.AsInt64()) + case Float64NumberKind: + return NewFloat64Number(-nn.AsFloat64()) + } + return nn +} + // - as x // AsNumber gets the Number. diff --git a/api/metric/number_test.go b/api/metric/number_test.go index 5a91eb3f5dc..80ca6c42663 100644 --- a/api/metric/number_test.go +++ b/api/metric/number_test.go @@ -15,6 +15,7 @@ package metric import ( + "math" "testing" "unsafe" @@ -170,3 +171,45 @@ func TestNumberAsInterface(t *testing.T) { require.Equal(t, 11.11, (&f64).AsInterface(Float64NumberKind).(float64)) require.Equal(t, uint64(100), (&u64).AsInterface(Uint64NumberKind).(uint64)) } + +func TestNumberSignChange(t *testing.T) { + t.Run("Int64", func(t *testing.T) { + posInt := NewInt64Number(10) + negInt := NewInt64Number(-10) + + require.Equal(t, posInt, NewNumberSignChange(Int64NumberKind, negInt)) + require.Equal(t, negInt, NewNumberSignChange(Int64NumberKind, posInt)) + }) + + t.Run("Float64", func(t *testing.T) { + posFloat := NewFloat64Number(10) + negFloat := NewFloat64Number(-10) + + require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat)) + require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat)) + }) + + t.Run("Float64Zero", func(t *testing.T) { + posFloat := NewFloat64Number(0) + negFloat := NewFloat64Number(math.Copysign(0, -1)) + + require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat)) + require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat)) + }) + + t.Run("Float64Inf", func(t *testing.T) { + posFloat := NewFloat64Number(math.Inf(+1)) + negFloat := NewFloat64Number(math.Inf(-1)) + + require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat)) + require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat)) + }) + + t.Run("Float64NaN", func(t *testing.T) { + posFloat := NewFloat64Number(math.NaN()) + negFloat := NewFloat64Number(math.Copysign(math.NaN(), -1)) + + require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat)) + require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat)) + }) +} diff --git a/example/otlp/main.go b/example/otlp/main.go index 87d9a9feda1..7753c3ee227 100644 --- a/example/otlp/main.go +++ b/example/otlp/main.go @@ -50,7 +50,6 @@ func initProvider() (*otlp.Exporter, *push.Controller) { pusher := push.New( simple.NewWithExactDistribution(), exp, - push.WithStateful(true), push.WithPeriod(2*time.Second), ) diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 0270a04d68b..82e64cf66bc 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -32,11 +32,9 @@ import ( "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) -// Exporter is an implementation of metric.Exporter that sends metrics to -// Prometheus. -// -// This exporter supports Prometheus pulls, as such it does not -// implement the export.Exporter interface. +// Exporter supports Prometheus pulls. It does not implement the +// sdk/export/metric.Exporter interface--instead it creates a pull +// controller and reads the latest checkpointed data on-scrape. type Exporter struct { handler http.Handler @@ -144,20 +142,11 @@ func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error func (e *Exporter) SetController(config Config, options ...pull.Option) { e.lock.Lock() defer e.lock.Unlock() - // Prometheus uses a stateful pull controller since instruments are - // cumulative and should not be reset after each collection interval. - // - // Prometheus uses this approach to be resilient to scrape failures. - // If a Prometheus server tries to scrape metrics from a host and fails for some reason, - // it could try again on the next scrape and no data would be lost, only resolution. - // - // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. - // - // TODO: Prometheus supports "Gauge Histogram" which are - // expressed as delta histograms. + e.controller = pull.New( simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries), - append(options, pull.WithStateful(true))..., + e, + options..., ) } @@ -173,6 +162,15 @@ func (e *Exporter) Controller() *pull.Controller { return e.controller } +func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind { + // NOTE: Summary values should use Delta aggregation, then be + // combined into a sliding window, see the TODO below. + // NOTE: Prometheus also supports a "GaugeDelta" exposition format, + // which is expressed as a delta histogram. Need to understand if this + // should be a default behavior for ValueRecorder/ValueObserver. + return export.CumulativeExporter +} + func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { e.handler.ServeHTTP(w, r) } @@ -188,7 +186,7 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { c.exp.lock.RLock() defer c.exp.lock.RUnlock() - _ = c.exp.Controller().ForEach(func(record export.Record) error { + _ = c.exp.Controller().ForEach(c.exp, func(record export.Record) error { var labelKeys []string mergeLabels(record, &labelKeys, nil) ch <- c.toDesc(record, labelKeys) @@ -209,7 +207,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { global.Handle(err) } - err := ctrl.ForEach(func(record export.Record) error { + err := ctrl.ForEach(c.exp, func(record export.Record) error { agg := record.Aggregation() numberKind := record.Descriptor().NumberKind() diff --git a/exporters/metric/stdout/stdout.go b/exporters/metric/stdout/stdout.go index f15a0b88c84..0c3d2c24252 100644 --- a/exporters/metric/stdout/stdout.go +++ b/exporters/metric/stdout/stdout.go @@ -26,7 +26,7 @@ import ( "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/label" - + "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/controller/push" @@ -132,9 +132,6 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller // NewExportPipeline sets up a complete export pipeline with the // recommended setup, chaining a NewRawExporter into the recommended // selectors and integrators. -// -// The pipeline is configured with a stateful integrator unless the -// push.WithStateful(false) option is used. func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) { exporter, err := NewRawExporter(config) if err != nil { @@ -143,13 +140,17 @@ func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, pusher := push.New( simple.NewWithExactDistribution(), exporter, - append([]push.Option{push.WithStateful(true)}, options...)..., + options..., ) pusher.Start() return pusher, nil } +func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind { + return export.PassThroughExporter +} + func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { var aggError error var batch expoBatch @@ -157,7 +158,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) ts := time.Now() batch.Timestamp = &ts } - aggError = checkpointSet.ForEach(func(record export.Record) error { + aggError = checkpointSet.ForEach(e, func(record export.Record) error { desc := record.Descriptor() agg := record.Aggregation() kind := desc.NumberKind() diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index 904ae498d64..2a4ea885569 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -108,7 +108,9 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l return newAgg, true } -func (p *CheckpointSet) ForEach(f func(export.Record) error) error { +// ForEach does not use ExportKindSelected: use a real Integrator to +// test ExportKind functionality. +func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Record) error) error { for _, r := range p.updates { if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) { return err diff --git a/exporters/otlp/internal/transform/metric.go b/exporters/otlp/internal/transform/metric.go index 8975fe3585d..18d0bf06a90 100644 --- a/exporters/otlp/internal/transform/metric.go +++ b/exporters/otlp/internal/transform/metric.go @@ -62,8 +62,8 @@ type result struct { // CheckpointSet transforms all records contained in a checkpoint into // batched OTLP ResourceMetrics. -func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { - records, errc := source(ctx, cps) +func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { + records, errc := source(ctx, exportSelector, cps) // Start a fixed number of goroutines to transform records. transformed := make(chan result) @@ -96,14 +96,14 @@ func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uin // source starts a goroutine that sends each one of the Records yielded by // the CheckpointSet on the returned chan. Any error encoutered will be sent // on the returned error chan after seeding is complete. -func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record, <-chan error) { +func source(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet) (<-chan export.Record, <-chan error) { errc := make(chan error, 1) out := make(chan export.Record) // Seed records into process. go func() { defer close(out) // No select is needed since errc is buffered. - errc <- cps.ForEach(func(r export.Record) error { + errc <- cps.ForEach(exportSelector, func(r export.Record) error { select { case <-ctx.Done(): return ErrContextCanceled diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 1db7899a091..c51d3f43052 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -28,8 +28,10 @@ import ( colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1" coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1" + "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/otlp/internal/transform" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" ) @@ -238,7 +240,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e } }(ctx, cancel) - rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers) + rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers) if err != nil { return err } @@ -265,6 +267,10 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e return nil } +func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) metricsdk.ExportKind { + return metricsdk.PassThroughExporter +} + func (e *Exporter) ExportSpan(ctx context.Context, sd *tracesdk.SpanData) { e.uploadTraces(ctx, []*tracesdk.SpanData{sd}) } diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index dd7abba580a..9117662c029 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel/api/metric" metricapi "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/otlp" + metricsdk "go.opentelemetry.io/otel/sdk/export/metric" exporttrace "go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/metric/controller/push" integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" @@ -116,7 +117,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } selector := simple.NewWithExactDistribution() - integrator := integrator.New(selector, true) + integrator := integrator.New(selector, metricsdk.PassThroughExporter) pusher := push.New(integrator, exp) pusher.Start() diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index dcbb119e877..f1d8d56c0d7 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -82,7 +82,7 @@ type checkpointSet struct { records []metricsdk.Record } -func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error { +func (m *checkpointSet) ForEach(_ metricsdk.ExportKindSelector, fn func(metricsdk.Record) error) error { for _, r := range m.records { if err := fn(r); err != nil && err != aggregation.ErrNoData { return err diff --git a/sdk/export/metric/aggregation/aggregation.go b/sdk/export/metric/aggregation/aggregation.go index b42a4fc13b3..df974b6c035 100644 --- a/sdk/export/metric/aggregation/aggregation.go +++ b/sdk/export/metric/aggregation/aggregation.go @@ -151,6 +151,7 @@ var ( ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument") ErrNaNInput = fmt.Errorf("NaN value is an invalid input") ErrInconsistentType = fmt.Errorf("inconsistent aggregator types") + ErrNoSubtraction = fmt.Errorf("aggregator does not subtract") // ErrNoData is returned when (due to a race with collection) // the Aggregator is check-pointed before the first value is set. diff --git a/sdk/export/metric/exportkind_string.go b/sdk/export/metric/exportkind_string.go new file mode 100644 index 00000000000..c2f8bcf8dff --- /dev/null +++ b/sdk/export/metric/exportkind_string.go @@ -0,0 +1,35 @@ +// Code generated by "stringer -type=ExportKind"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[CumulativeExporter-1] + _ = x[DeltaExporter-2] + _ = x[PassThroughExporter-4] +} + +const ( + _ExportKind_name_0 = "CumulativeExporterDeltaExporter" + _ExportKind_name_1 = "PassThroughExporter" +) + +var ( + _ExportKind_index_0 = [...]uint8{0, 18, 31} +) + +func (i ExportKind) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _ExportKind_name_0[_ExportKind_index_0[i]:_ExportKind_index_0[i+1]] + case i == 4: + return _ExportKind_name_1 + default: + return "ExportKind(" + strconv.FormatInt(int64(i), 10) + ")" + } +} diff --git a/sdk/export/metric/exportkind_test.go b/sdk/export/metric/exportkind_test.go new file mode 100644 index 00000000000..d3ba61f6af9 --- /dev/null +++ b/sdk/export/metric/exportkind_test.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" +) + +func TestExportKindIdentity(t *testing.T) { + akind := aggregation.Kind(0) + + require.Equal(t, CumulativeExporter, CumulativeExporter.ExportKindFor(nil, akind)) + require.Equal(t, DeltaExporter, DeltaExporter.ExportKindFor(nil, akind)) + require.Equal(t, PassThroughExporter, PassThroughExporter.ExportKindFor(nil, akind)) +} + +func TestExportKindIncludes(t *testing.T) { + require.True(t, CumulativeExporter.Includes(CumulativeExporter)) + require.True(t, DeltaExporter.Includes(CumulativeExporter|DeltaExporter)) + require.False(t, DeltaExporter.Includes(PassThroughExporter|CumulativeExporter)) +} + +var deltaMemoryKinds = []metric.Kind{ + metric.SumObserverKind, + metric.UpDownSumObserverKind, +} + +var cumulativeMemoryKinds = []metric.Kind{ + metric.ValueRecorderKind, + metric.ValueObserverKind, + metric.CounterKind, + metric.UpDownCounterKind, +} + +func TestExportKindMemoryRequired(t *testing.T) { + for _, kind := range deltaMemoryKinds { + require.True(t, DeltaExporter.MemoryRequired(kind)) + require.False(t, CumulativeExporter.MemoryRequired(kind)) + require.False(t, PassThroughExporter.MemoryRequired(kind)) + } + + for _, kind := range cumulativeMemoryKinds { + require.True(t, CumulativeExporter.MemoryRequired(kind)) + require.False(t, DeltaExporter.MemoryRequired(kind)) + require.False(t, PassThroughExporter.MemoryRequired(kind)) + } +} diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 770b248f6cb..11b2b404115 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate stringer -type=ExportKind + package metric // import "go.opentelemetry.io/otel/sdk/export/metric" import ( @@ -154,6 +156,16 @@ type Aggregator interface { Merge(Aggregator, *metric.Descriptor) error } +// Subtractor is an optional interface implemented by some +// Aggregators. An Aggregator must support `Subtract()` in order to +// be configured for a Precomputed-Sum instrument (SumObserver, +// UpDownSumObserver) using a DeltaExporter. +type Subtractor interface { + // Subtract subtracts the `operand` from this Aggregator and + // outputs the value in `result`. + Subtract(operand, result Aggregator, descriptor *metric.Descriptor) error +} + // Exporter handles presentation of the checkpoint of aggregate // metrics. This is the final stage of a metrics export pipeline, // where metric data are formatted for a specific system. @@ -167,6 +179,21 @@ type Exporter interface { // The CheckpointSet interface refers to the Integrator that just // completed collection. Export(context.Context, CheckpointSet) error + + // ExportKindSelector is an interface used by the Integrator + // in deciding whether to compute Delta or Cumulative + // Aggregations when passing Records to this Exporter. + ExportKindSelector +} + +// ExportKindSelector is a sub-interface of Exporter used to indicate +// whether the Integrator should compute Delta or Cumulative +// Aggregations. +type ExportKindSelector interface { + // ExportKindFor should return the correct ExportKind that + // should be used when exporting data for the given metric + // instrument and Aggregator kind. + ExportKindFor(*metric.Descriptor, aggregation.Kind) ExportKind } // CheckpointSet allows a controller to access a complete checkpoint of @@ -178,11 +205,16 @@ type CheckpointSet interface { // metrics that were updated during the last collection // period. Each aggregated checkpoint returned by the // function parameter may return an error. + // + // The ExportKindSelector argument is used to determine + // whether the Record is computed using Delta or Cumulative + // aggregation. + // // ForEach tolerates ErrNoData silently, as this is // expected from the Meter implementation. Any other kind // of error will immediately halt ForEach and return // the error to the caller. - ForEach(func(Record) error) error + ForEach(ExportKindSelector, func(Record) error) error // Locker supports locking the checkpoint set. Collection // into the checkpoint set cannot take place (in case of a @@ -292,3 +324,52 @@ func (r Record) StartTime() time.Time { func (r Record) EndTime() time.Time { return r.end } + +// ExportKind indicates the kind of data exported by an exporter. +// These bits may be OR-d together when multiple exporters are in use. +type ExportKind int + +const ( + // CumulativeExporter indicates that the Exporter expects a + // Cumulative Aggregation. + CumulativeExporter ExportKind = 1 // e.g., Prometheus + + // DeltaExporter indicates that the Exporter expects a + // Delta Aggregation. + DeltaExporter ExportKind = 2 // e.g., StatsD + + // PassThroughExporter indicates that the Exporter expects + // either a Cumulative or a Delta Aggregation, whichever does + // not require maintaining state for the given instrument. + PassThroughExporter ExportKind = 4 // e.g., OTLP +) + +// Includes tests whether `kind` includes a specific kind of +// exporter. +func (kind ExportKind) Includes(has ExportKind) bool { + return kind&has != 0 +} + +// ExportKindFor returns a constant, as an implementation of ExportKindSelector. +func (kind ExportKind) ExportKindFor(_ *metric.Descriptor, _ aggregation.Kind) ExportKind { + return kind +} + +// MemoryRequired returns whether an exporter of this kind requires +// memory to export correctly. +func (kind ExportKind) MemoryRequired(mkind metric.Kind) bool { + switch mkind { + case metric.ValueRecorderKind, metric.ValueObserverKind, + metric.CounterKind, metric.UpDownCounterKind: + // Delta-oriented instruments: + return kind.Includes(CumulativeExporter) + + case metric.SumObserverKind, metric.UpDownSumObserverKind: + // Cumulative-oriented instruments: + return kind.Includes(DeltaExporter) + } + // Something unexpected is happening--we could panic. This + // will become an error when the exporter tries to access a + // checkpoint, presumably, so let it be. + return false +} diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 6bf37a63e17..c6d76e10f59 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -43,6 +43,9 @@ var _ aggregation.Distribution = &Aggregator{} // New returns a new DDSketch aggregator. func New(cnt int, desc *metric.Descriptor, cfg *Config) []Aggregator { + if cfg == nil { + cfg = NewDefaultConfig() + } aggs := make([]Aggregator, cnt) for i := range aggs { aggs[i] = Aggregator{ diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index 67fb5b50ac2..d61a0da9ae8 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -31,6 +31,7 @@ type Aggregator struct { } var _ export.Aggregator = &Aggregator{} +var _ export.Subtractor = &Aggregator{} var _ aggregation.Sum = &Aggregator{} // New returns a new counter aggregator implemented by atomic @@ -82,3 +83,19 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error c.value.AddNumber(desc.NumberKind(), o.value) return nil } + +func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *metric.Descriptor) error { + op, _ := opAgg.(*Aggregator) + if op == nil { + return aggregator.NewInconsistentAggregatorError(c, opAgg) + } + + res, _ := resAgg.(*Aggregator) + if res == nil { + return aggregator.NewInconsistentAggregatorError(c, resAgg) + } + + res.value = c.value + res.value.AddNumber(descriptor.NumberKind(), metric.NewNumberSignChange(descriptor.NumberKind(), op.value)) + return nil +} diff --git a/sdk/metric/controller/pull/config.go b/sdk/metric/controller/pull/config.go index d3389270467..cf786c84505 100644 --- a/sdk/metric/controller/pull/config.go +++ b/sdk/metric/controller/pull/config.go @@ -27,11 +27,6 @@ type Config struct { // created by the Controller. Resource *resource.Resource - // Stateful causes the controller to maintain state across - // collection events, so that records in the exported - // checkpoint set are cumulative. - Stateful bool - // CachePeriod is the period which a recently-computed result // will be returned without gathering metric data again. // @@ -57,17 +52,6 @@ func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } -// WithStateful sets the Stateful configuration option of a Config. -func WithStateful(stateful bool) Option { - return statefulOption(stateful) -} - -type statefulOption bool - -func (o statefulOption) Apply(config *Config) { - config.Stateful = bool(o) -} - // WithCachePeriod sets the CachePeriod configuration option of a Config. func WithCachePeriod(cachePeriod time.Duration) Option { return cachePeriodOption(cachePeriod) diff --git a/sdk/metric/controller/pull/pull.go b/sdk/metric/controller/pull/pull.go index 58534b25d29..401fb4867c6 100644 --- a/sdk/metric/controller/pull/pull.go +++ b/sdk/metric/controller/pull/pull.go @@ -45,7 +45,7 @@ type Controller struct { } // New returns a *Controller configured with an aggregation selector and options. -func New(selector export.AggregationSelector, options ...Option) *Controller { +func New(aselector export.AggregationSelector, eselector export.ExportKindSelector, options ...Option) *Controller { config := &Config{ Resource: resource.Empty(), CachePeriod: DefaultCachePeriod, @@ -53,7 +53,7 @@ func New(selector export.AggregationSelector, options ...Option) *Controller { for _, opt := range options { opt.Apply(config) } - integrator := integrator.New(selector, config.Stateful) + integrator := integrator.New(aselector, eselector) accum := sdk.NewAccumulator( integrator, sdk.WithResource(config.Resource), @@ -83,11 +83,11 @@ func (c *Controller) Provider() metric.Provider { // Foreach gives the caller read-locked access to the current // export.CheckpointSet. -func (c *Controller) ForEach(f func(export.Record) error) error { +func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error { c.integrator.RLock() defer c.integrator.RUnlock() - return c.checkpoint.ForEach(f) + return c.checkpoint.ForEach(ks, f) } // Collect requests a collection. The collection will be skipped if diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go index e252e3cd168..bc264218e83 100644 --- a/sdk/metric/controller/pull/pull_test.go +++ b/sdk/metric/controller/pull/pull_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/controller/pull" controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" "go.opentelemetry.io/otel/sdk/metric/integrator/test" @@ -34,8 +35,8 @@ import ( func TestPullNoCache(t *testing.T) { puller := pull.New( selector.NewWithExactDistribution(), + export.CumulativeExporter, pull.WithCachePeriod(0), - pull.WithStateful(true), ) ctx := context.Background() @@ -46,7 +47,7 @@ func TestPullNoCache(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records := test.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter/A=B/": 10, @@ -56,7 +57,7 @@ func TestPullNoCache(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records = test.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter/A=B/": 20, @@ -66,8 +67,8 @@ func TestPullNoCache(t *testing.T) { func TestPullWithCache(t *testing.T) { puller := pull.New( selector.NewWithExactDistribution(), + export.CumulativeExporter, pull.WithCachePeriod(time.Second), - pull.WithStateful(true), ) mock := controllerTest.NewMockClock() puller.SetClock(mock) @@ -80,7 +81,7 @@ func TestPullWithCache(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records := test.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter/A=B/": 10, @@ -91,7 +92,7 @@ func TestPullWithCache(t *testing.T) { // Cached value! require.NoError(t, puller.Collect(ctx)) records = test.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter/A=B/": 10, @@ -103,7 +104,7 @@ func TestPullWithCache(t *testing.T) { // Re-computed value! require.NoError(t, puller.Collect(ctx)) records = test.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter/A=B/": 20, diff --git a/sdk/metric/controller/push/config.go b/sdk/metric/controller/push/config.go index f76df130ccb..a22b300b5ea 100644 --- a/sdk/metric/controller/push/config.go +++ b/sdk/metric/controller/push/config.go @@ -26,11 +26,6 @@ type Config struct { // created by the Controller. Resource *resource.Resource - // Stateful causes the controller to maintain state across - // collection events, so that records in the exported - // checkpoint set are cumulative. - Stateful bool - // Period is the interval between calls to Collect a checkpoint. Period time.Duration @@ -57,17 +52,6 @@ func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } -// WithStateful sets the Stateful configuration option of a Config. -func WithStateful(stateful bool) Option { - return statefulOption(stateful) -} - -type statefulOption bool - -func (o statefulOption) Apply(config *Config) { - config.Stateful = bool(o) -} - // WithPeriod sets the Period configuration option of a Config. func WithPeriod(period time.Duration) Option { return periodOption(period) diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index d60e060c3e2..8d2c7930abb 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -60,7 +60,7 @@ func New(selector export.AggregationSelector, exporter export.Exporter, opts ... c.Timeout = c.Period } - integrator := simple.New(selector, c.Stateful) + integrator := simple.New(selector, exporter) impl := sdk.NewAccumulator( integrator, sdk.WithResource(c.Resource), diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index 69780d3c1d6..8aa34d86217 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -91,12 +91,16 @@ func newFixture(t *testing.T) testFixture { } } +func (e *testExporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind { + return export.PassThroughExporter +} + func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.lock.Lock() defer e.lock.Unlock() e.exports++ var records []export.Record - if err := checkpointSet.ForEach(func(r export.Record) error { + if err := checkpointSet.ForEach(e, func(r export.Record) error { if e.injectErr != nil { if err := e.injectErr(r); err != nil { return err diff --git a/sdk/metric/integrator/simple/simple.go b/sdk/metric/integrator/simple/simple.go index ced483bfa00..52beff54db5 100644 --- a/sdk/metric/integrator/simple/simple.go +++ b/sdk/metric/integrator/simple/simple.go @@ -29,31 +29,47 @@ import ( type ( Integrator struct { + export.ExportKindSelector export.AggregationSelector - stateful bool - batch + + state } - batchKey struct { + stateKey struct { descriptor *metric.Descriptor distinct label.Distinct resource label.Distinct } - batchValue struct { - aggregator export.Aggregator - labels *label.Set - resource *resource.Resource + stateValue struct { + // labels corresponds to the stateKey.distinct field. + labels *label.Set + + // resource corresponds to the stateKey.resource field. + resource *resource.Resource + + // updated indicates the last sequence number when this value had + // Process() called by an accumulator. + updated int64 + + // stateful indicates that a cumulative aggregation is + // being maintained, taken from the process start time. + stateful bool + + current export.Aggregator // refers to single-accumulator checkpoint or delta. + delta export.Aggregator // owned if multi accumulator else nil. + cumulative export.Aggregator // owned if stateful else nil. } - batch struct { + state struct { // RWMutex implements locking for the `CheckpointSet` interface. sync.RWMutex - values map[batchKey]batchValue + values map[stateKey]*stateValue // Note: the timestamp logic currently assumes all // exports are deltas. + processStart time.Time intervalStart time.Time intervalEnd time.Time @@ -68,97 +84,271 @@ type ( ) var _ export.Integrator = &Integrator{} -var _ export.CheckpointSet = &batch{} +var _ export.CheckpointSet = &state{} var ErrInconsistentState = fmt.Errorf("inconsistent integrator state") +var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind") -func New(selector export.AggregationSelector, stateful bool) *Integrator { +// New returns a basic Integrator using the provided +// AggregationSelector to select Aggregators. The ExportKindSelector +// is consulted to determine the kind(s) of exporter that will consume +// data, so that this Integrator can prepare to compute Delta or +// Cumulative Aggregations as needed. +func New(aselector export.AggregationSelector, eselector export.ExportKindSelector) *Integrator { + now := time.Now() return &Integrator{ - AggregationSelector: selector, - stateful: stateful, - batch: batch{ - values: map[batchKey]batchValue{}, - intervalStart: time.Now(), + AggregationSelector: aselector, + ExportKindSelector: eselector, + state: state{ + values: map[stateKey]*stateValue{}, + processStart: now, + intervalStart: now, }, } } -func (b *Integrator) Process(accumulation export.Accumulation) error { +// Process implements export.Integrator. +func (b *Integrator) Process(accum export.Accumulation) error { if b.startedCollection != b.finishedCollection+1 { return ErrInconsistentState } - - desc := accumulation.Descriptor() - key := batchKey{ + desc := accum.Descriptor() + key := stateKey{ descriptor: desc, - distinct: accumulation.Labels().Equivalent(), - resource: accumulation.Resource().Equivalent(), - } - agg := accumulation.Aggregator() - value, ok := b.batch.values[key] - if ok { - // Note: The call to Merge here combines only - // identical accumulations. It is required even for a - // stateless Integrator because such identical accumulations - // may arise in the Meter implementation due to race - // conditions. - return value.aggregator.Merge(agg, desc) - } - // If this integrator is stateful, create a copy of the - // Aggregator for long-term storage. Otherwise the - // Meter implementation will checkpoint the aggregator - // again, overwriting the long-lived state. - if b.stateful { - tmp := agg - // Note: the call to AggregatorFor() followed by Merge - // is effectively a Clone() operation. - b.AggregatorFor(desc, &agg) - if err := agg.Merge(tmp, desc); err != nil { - return err + distinct: accum.Labels().Equivalent(), + resource: accum.Resource().Equivalent(), + } + agg := accum.Aggregator() + + // Check if there is an existing value. + value, ok := b.state.values[key] + if !ok { + stateful := b.ExportKindFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.MetricKind()) + + newValue := &stateValue{ + labels: accum.Labels(), + resource: accum.Resource(), + updated: b.state.finishedCollection, + stateful: stateful, + current: agg, } + if stateful { + if desc.MetricKind().PrecomputedSum() { + // If we know we need to compute deltas, allocate two aggregators. + b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta) + } else { + // In this case we are not certain to need a delta, only allocate a + // cumulative aggregator. We _may_ need a delta accumulator if + // multiple synchronous Accumulators produce an Accumulation (handled + // below), which requires merging them into a temporary Aggregator. + b.AggregatorFor(desc, &newValue.cumulative) + } + } + b.state.values[key] = newValue + return nil + } + + // Advance the update sequence number. + sameCollection := b.state.finishedCollection == value.updated + value.updated = b.state.finishedCollection + + // An existing value will be found for some stateKey when: + // (a) stateful aggregation is being used + // (b) multiple accumulators are being used. + // + // Case (a) occurs when the instrument and the exporter + // require memory to work correctly, either because the + // instrument reports a PrecomputedSum to a DeltaExporter or + // the reverse, a non-PrecomputedSum instrument with a + // CumulativeExporter. This logic is encapsulated in + // ExportKind.MemoryRequired(MetricKind). + // + // Case (b) occurs when the variable `sameCollection` is true, + // indicating that the stateKey for Accumulation has already + // been seen in the same collection. When this happens, it + // implies that multiple Accumulators are being used because + // the Accumulator outputs a maximum of one Accumulation per + // instrument and label set. + // + // The following logic distinguishes between asynchronous and + // synchronous instruments in order to ensure that the use of + // multiple Accumulators does not change instrument semantics. + // To maintain the instrument semantics, multiple synchronous + // Accumulations should be merged, whereas when multiple + // asynchronous Accumulations are processed, the last value + // should be kept. + + if !sameCollection { + // This is the first Accumulation we've seen for this + // stateKey during this collection. Just keep a + // reference to the Accumulator's Aggregator. + value.current = agg + return nil } - b.batch.values[key] = batchValue{ - aggregator: agg, - labels: accumulation.Labels(), - resource: accumulation.Resource(), + if desc.MetricKind().Asynchronous() { + // The last value across multiple accumulators is taken. + // Just keep a reference to the Accumulator's Aggregator. + value.current = agg + return nil } - return nil + + // The above two cases are keeping a reference to the + // Accumulator's Aggregator. The remaining cases address + // synchronous instruments, which always merge multiple + // Accumulations using `value.delta` for temporary storage. + + if value.delta == nil { + // The temporary `value.delta` may have been allocated + // already, either in a prior pass through this block of + // code or in the `!ok` branch above. It would be + // allocated in the `!ok` branch if this is stateful + // PrecomputedSum instrument (in which case the exporter + // is requesting a delta so we allocate it up front), + // and it would be allocated in this block when multiple + // accumulators are used and the first condition is not + // met. + b.AggregationSelector.AggregatorFor(desc, &value.delta) + } + if value.current != value.delta { + // If the current and delta Aggregators are not the same it + // implies that multiple Accumulators were used. The first + // Accumulation seen for a given stateKey will return in + // one of the cases above after assigning `value.current + // = agg` (i.e., after taking a reference to the + // Accumulator's Aggregator). + // + // The second time through this branch copies the + // Accumulator's Aggregator into `value.delta` and sets + // `value.current` appropriately to avoid this branch if + // a third Accumulator is used. + err := value.current.SynchronizedCopy(value.delta, desc) + if err != nil { + return err + } + value.current = value.delta + } + // The two statements above ensures that `value.current` refers + // to `value.delta` and not to an Accumulator's Aggregator. Now + // combine this Accumulation with the prior Accumulation. + return value.delta.Merge(agg, desc) } +// CheckpointSet returns the associated CheckpointSet. Use the +// CheckpointSet Locker interface to synchronize access to this +// object. The CheckpointSet.ForEach() method cannot be called +// concurrently with Process(). func (b *Integrator) CheckpointSet() export.CheckpointSet { - return &b.batch + return &b.state } +// StartCollection signals to the Integrator one or more Accumulators +// will begin calling Process() calls during collection. func (b *Integrator) StartCollection() { if b.startedCollection != 0 { b.intervalStart = b.intervalEnd } b.startedCollection++ - if !b.stateful { - b.batch.values = map[batchKey]batchValue{} - } } +// FinishCollection signals to the Integrator that a complete +// collection has finished and that ForEach will be called to access +// the CheckpointSet. func (b *Integrator) FinishCollection() error { - b.finishedCollection++ b.intervalEnd = time.Now() - if b.startedCollection != b.finishedCollection { + if b.startedCollection != b.finishedCollection+1 { return ErrInconsistentState } + defer func() { b.finishedCollection++ }() + + for key, value := range b.values { + mkind := key.descriptor.MetricKind() + + if !value.stateful { + if value.updated != b.finishedCollection { + delete(b.values, key) + } + continue + } + + // Update Aggregator state to support exporting either a + // delta or a cumulative aggregation. + var err error + if mkind.PrecomputedSum() { + // delta_value = current_cumulative_value - previous_cumulative_value + if subt, ok := value.current.(export.Subtractor); ok { + err = subt.Subtract(value.cumulative, value.delta, key.descriptor) + + if err == nil { + err = value.current.SynchronizedCopy(value.cumulative, key.descriptor) + } + } else { + err = aggregation.ErrNoSubtraction + } + } else { + // cumulative_value = previous_cumulative_value + current_delta_value + err = value.cumulative.Merge(value.current, key.descriptor) + } + if err != nil { + return err + } + } return nil } -func (b *batch) ForEach(f func(export.Record) error) error { +// ForEach iterates through the CheckpointSet, passing an +// export.Record with the appropriate Cumulative or Delta aggregation +// to an exporter. +func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error { if b.startedCollection != b.finishedCollection { return ErrInconsistentState } - for key, value := range b.values { + mkind := key.descriptor.MetricKind() + + var agg aggregation.Aggregation + var start time.Time + + ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind()) + switch ekind { + case export.PassThroughExporter: + // No state is required, pass through the checkpointed value. + agg = value.current.Aggregation() + + if mkind.PrecomputedSum() { + start = b.processStart + } else { + start = b.intervalStart + } + + case export.CumulativeExporter: + // If stateful, the sum has been computed. If stateless, the + // input was already cumulative. Either way, use the checkpointed + // value: + if value.stateful { + agg = value.cumulative.Aggregation() + } else { + agg = value.current.Aggregation() + } + start = b.processStart + + case export.DeltaExporter: + // Precomputed sums are a special case. + if mkind.PrecomputedSum() { + agg = value.delta.Aggregation() + } else { + agg = value.current.Aggregation() + } + start = b.intervalStart + + default: + return fmt.Errorf("%v: %w", ekind, ErrInvalidExporterKind) + } + if err := f(export.NewRecord( key.descriptor, value.labels, value.resource, - value.aggregator.Aggregation(), - b.intervalStart, + agg, + start, b.intervalEnd, )); err != nil && !errors.Is(err, aggregation.ErrNoData) { return err diff --git a/sdk/metric/integrator/simple/simple_test.go b/sdk/metric/integrator/simple/simple_test.go index 54d15fb545b..49d6a9bb586 100644 --- a/sdk/metric/integrator/simple/simple_test.go +++ b/sdk/metric/integrator/simple/simple_test.go @@ -16,6 +16,8 @@ package simple_test import ( "context" + "errors" + "fmt" "testing" "time" @@ -24,248 +26,306 @@ import ( "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" + exportTest "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/array" + "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" + "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/integrator/simple" "go.opentelemetry.io/otel/sdk/metric/integrator/test" "go.opentelemetry.io/otel/sdk/resource" ) -// Note: This var block and the helpers below will disappear in a -// future PR (see the draft in #799). The test has been completely -// rewritten there, so this code will simply be dropped. - -var ( - // Resource is applied to all test records built in this package. - Resource = resource.New(kv.String("R", "V")) - - // LastValueADesc and LastValueBDesc group by "G" - LastValueADesc = metric.NewDescriptor( - "a.lastvalue", metric.ValueObserverKind, metric.Int64NumberKind) - LastValueBDesc = metric.NewDescriptor( - "b.lastvalue", metric.ValueObserverKind, metric.Int64NumberKind) - // CounterADesc and CounterBDesc group by "C" - CounterADesc = metric.NewDescriptor( - "a.sum", metric.CounterKind, metric.Int64NumberKind) - CounterBDesc = metric.NewDescriptor( - "b.sum", metric.CounterKind, metric.Int64NumberKind) - - // LastValue groups are (labels1), (labels2+labels3) - // Counter groups are (labels1+labels2), (labels3) - - // Labels1 has G=H and C=D - Labels1 = makeLabels(kv.String("G", "H"), kv.String("C", "D")) - // Labels2 has C=D and E=F - Labels2 = makeLabels(kv.String("C", "D"), kv.String("E", "F")) - // Labels3 is the empty set - Labels3 = makeLabels() -) - -func makeLabels(labels ...kv.KeyValue) *label.Set { - s := label.NewSet(labels...) - return &s -} +// TestIntegrator tests all the non-error paths in this package. +func TestIntegrator(t *testing.T) { + type exportCase struct { + kind export.ExportKind + } + type instrumentCase struct { + kind metric.Kind + } + type numberCase struct { + kind metric.NumberKind + } + type aggregatorCase struct { + kind aggregation.Kind + } -// LastValueAgg returns a checkpointed lastValue aggregator w/ the specified descriptor and value. -func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator { - ctx := context.Background() - gagg := &lastvalue.New(1)[0] - _ = gagg.Update(ctx, metric.NewInt64Number(v), desc) - return gagg + for _, tc := range []exportCase{ + {kind: export.PassThroughExporter}, + {kind: export.CumulativeExporter}, + {kind: export.DeltaExporter}, + } { + t.Run(tc.kind.String(), func(t *testing.T) { + for _, ic := range []instrumentCase{ + {kind: metric.CounterKind}, + {kind: metric.UpDownCounterKind}, + {kind: metric.ValueRecorderKind}, + {kind: metric.SumObserverKind}, + {kind: metric.UpDownSumObserverKind}, + {kind: metric.ValueObserverKind}, + } { + t.Run(ic.kind.String(), func(t *testing.T) { + for _, nc := range []numberCase{ + {kind: metric.Int64NumberKind}, + {kind: metric.Float64NumberKind}, + } { + t.Run(nc.kind.String(), func(t *testing.T) { + for _, ac := range []aggregatorCase{ + {kind: aggregation.SumKind}, + {kind: aggregation.MinMaxSumCountKind}, + {kind: aggregation.HistogramKind}, + {kind: aggregation.LastValueKind}, + {kind: aggregation.ExactKind}, + {kind: aggregation.SketchKind}, + } { + t.Run(ac.kind.String(), func(t *testing.T) { + testSynchronousIntegration( + t, + tc.kind, + ic.kind, + nc.kind, + ac.kind, + ) + }) + } + }) + } + }) + } + }) + } } -// Convenience method for building a test exported lastValue record. -func NewLastValueAccumulation(desc *metric.Descriptor, labels *label.Set, value int64) export.Accumulation { - return export.NewAccumulation(desc, labels, Resource, LastValueAgg(desc, value)) +type testSelector struct { + kind aggregation.Kind } -// Convenience method for building a test exported counter record. -func NewCounterAccumulation(desc *metric.Descriptor, labels *label.Set, value int64) export.Accumulation { - return export.NewAccumulation(desc, labels, Resource, CounterAgg(desc, value)) +func (ts testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { + for i := range aggPtrs { + switch ts.kind { + case aggregation.SumKind: + *aggPtrs[i] = &sum.New(1)[0] + case aggregation.MinMaxSumCountKind: + *aggPtrs[i] = &minmaxsumcount.New(1, desc)[0] + case aggregation.HistogramKind: + *aggPtrs[i] = &histogram.New(1, desc, nil)[0] + case aggregation.LastValueKind: + *aggPtrs[i] = &lastvalue.New(1)[0] + case aggregation.SketchKind: + *aggPtrs[i] = &ddsketch.New(1, desc, nil)[0] + case aggregation.ExactKind: + *aggPtrs[i] = &array.New(1)[0] + } + } } -// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value. -func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator { +func testSynchronousIntegration( + t *testing.T, + ekind export.ExportKind, + mkind metric.Kind, + nkind metric.NumberKind, + akind aggregation.Kind, +) { ctx := context.Background() - cagg := &sum.New(1)[0] - _ = cagg.Update(ctx, metric.NewInt64Number(v), desc) - return cagg -} - -func TestSimpleStateless(t *testing.T) { - b := simple.New(test.AggregationSelector(), false) - - b.StartCollection() - - // Set initial lastValue values - _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels1, 10)) - _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels2, 20)) - _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels3, 30)) - - _ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels1, 10)) - _ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels2, 20)) - _ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels3, 30)) - - // Another lastValue Set for Labels1 - _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels1, 50)) - _ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels1, 50)) - - // Set initial counter values - _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10)) - _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels2, 20)) - _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels3, 40)) - - _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels1, 10)) - _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels2, 20)) - _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels3, 40)) + selector := testSelector{akind} + res := resource.New(kv.String("R", "V")) + + asNumber := func(value int64) metric.Number { + if nkind == metric.Int64NumberKind { + return metric.NewInt64Number(value) + } + return metric.NewFloat64Number(float64(value)) + } - // Another counter Add for Labels1 - _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 50)) - _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels1, 50)) + updateFor := func(desc *metric.Descriptor, value int64, labs []kv.KeyValue) export.Accumulation { + ls := label.NewSet(labs...) + var agg export.Aggregator + selector.AggregatorFor(desc, &agg) + _ = agg.Update(ctx, asNumber(value), desc) - require.NoError(t, b.FinishCollection()) + return export.NewAccumulation(desc, &ls, res, agg) + } - checkpointSet := b.CheckpointSet() - - records := test.NewOutput(label.DefaultEncoder()) - _ = checkpointSet.ForEach(records.AddRecord) - - // Output lastvalue should have only the "G=H" and "G=" keys. - // Output counter should have only the "C=D" and "C=" keys. - require.EqualValues(t, map[string]float64{ - "a.sum/C=D,G=H/R=V": 60, // labels1 - "a.sum/C=D,E=F/R=V": 20, // labels2 - "a.sum//R=V": 40, // labels3 - "b.sum/C=D,G=H/R=V": 60, // labels1 - "b.sum/C=D,E=F/R=V": 20, // labels2 - "b.sum//R=V": 40, // labels3 - "a.lastvalue/C=D,G=H/R=V": 50, // labels1 - "a.lastvalue/C=D,E=F/R=V": 20, // labels2 - "a.lastvalue//R=V": 30, // labels3 - "b.lastvalue/C=D,G=H/R=V": 50, // labels1 - "b.lastvalue/C=D,E=F/R=V": 20, // labels2 - "b.lastvalue//R=V": 30, // labels3 - }, records.Map) - - // Verify that state was reset - b.StartCollection() - require.NoError(t, b.FinishCollection()) - checkpointSet = b.CheckpointSet() - _ = checkpointSet.ForEach(func(rec export.Record) error { - t.Fatal("Unexpected call") - return nil - }) + labs1 := []kv.KeyValue{kv.String("L1", "V")} + labs2 := []kv.KeyValue{kv.String("L2", "V")} + + desc1 := metric.NewDescriptor("inst1", mkind, nkind) + desc2 := metric.NewDescriptor("inst2", mkind, nkind) + + // For 1 to 3 checkpoints: + for NAccum := 1; NAccum <= 3; NAccum++ { + t.Run(fmt.Sprintf("NumAccum=%d", NAccum), func(t *testing.T) { + // For 1 to 3 accumulators: + for NCheckpoint := 1; NCheckpoint <= 3; NCheckpoint++ { + t.Run(fmt.Sprintf("NumCkpt=%d", NCheckpoint), func(t *testing.T) { + + integrator := simple.New(selector, ekind) + + for nc := 0; nc < NCheckpoint; nc++ { + + // The input is 10 per update, scaled by + // the number of checkpoints for + // cumulative instruments: + input := int64(10) + cumulativeMultiplier := int64(nc + 1) + if mkind.PrecomputedSum() { + input *= cumulativeMultiplier + } + + integrator.StartCollection() + + for na := 0; na < NAccum; na++ { + _ = integrator.Process(updateFor(&desc1, input, labs1)) + _ = integrator.Process(updateFor(&desc2, input, labs2)) + } + + err := integrator.FinishCollection() + if err == aggregation.ErrNoSubtraction { + var subr export.Aggregator + selector.AggregatorFor(&desc1, &subr) + _, canSub := subr.(export.Subtractor) + + // Allow unsupported subraction case only when it is called for. + require.True(t, mkind.PrecomputedSum() && ekind == export.DeltaExporter && !canSub) + return + } else if err != nil { + t.Fatal(fmt.Sprint("unexpected FinishCollection error: ", err)) + } + + if nc < NCheckpoint-1 { + continue + } + + checkpointSet := integrator.CheckpointSet() + + // Test the final checkpoint state. + records1 := test.NewOutput(label.DefaultEncoder()) + err = checkpointSet.ForEach(ekind, records1.AddRecord) + + // Test for an allowed error: + if err != nil && err != aggregation.ErrNoSubtraction { + t.Fatal(fmt.Sprint("unexpected checkpoint error: ", err)) + } + var multiplier int64 + + if mkind.Asynchronous() { + // Because async instruments take the last value, + // the number of accumulators doesn't matter. + if mkind.PrecomputedSum() { + if ekind == export.DeltaExporter { + multiplier = 1 + } else { + multiplier = cumulativeMultiplier + } + } else { + if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind { + multiplier = cumulativeMultiplier + } else { + multiplier = 1 + } + } + } else { + // Synchronous accumulate results from multiple accumulators, + // use that number as the baseline multiplier. + multiplier = int64(NAccum) + if ekind == export.CumulativeExporter { + // If a cumulative exporter, include prior checkpoints. + multiplier *= cumulativeMultiplier + } + if akind == aggregation.LastValueKind { + // If a last-value aggregator, set multiplier to 1.0. + multiplier = 1 + } + } + + require.EqualValues(t, map[string]float64{ + "inst1/L1=V/R=V": float64(multiplier * 10), // labels1 + "inst2/L2=V/R=V": float64(multiplier * 10), // labels2 + }, records1.Map) + } + }) + } + }) + } } -func TestSimpleStateful(t *testing.T) { - ctx := context.Background() - b := simple.New(test.AggregationSelector(), true) - - b.StartCollection() +type bogusExporter struct{} - counterA := NewCounterAccumulation(&CounterADesc, Labels1, 10) - _ = b.Process(counterA) - - counterB := NewCounterAccumulation(&CounterBDesc, Labels1, 10) - _ = b.Process(counterB) - require.NoError(t, b.FinishCollection()) - - checkpointSet := b.CheckpointSet() - - records1 := test.NewOutput(label.DefaultEncoder()) - _ = checkpointSet.ForEach(records1.AddRecord) - - require.EqualValues(t, map[string]float64{ - "a.sum/C=D,G=H/R=V": 10, // labels1 - "b.sum/C=D,G=H/R=V": 10, // labels1 - }, records1.Map) - - alloc := sum.New(4) - caggA, caggB, ckptA, ckptB := &alloc[0], &alloc[1], &alloc[2], &alloc[3] - - // Test that state was NOT reset - checkpointSet = b.CheckpointSet() - - b.StartCollection() - require.NoError(t, b.FinishCollection()) - - records2 := test.NewOutput(label.DefaultEncoder()) - _ = checkpointSet.ForEach(records2.AddRecord) - - require.EqualValues(t, records1.Map, records2.Map) - - // Update and re-checkpoint the original record. - _ = caggA.Update(ctx, metric.NewInt64Number(20), &CounterADesc) - _ = caggB.Update(ctx, metric.NewInt64Number(20), &CounterBDesc) - err := caggA.SynchronizedCopy(ckptA, &CounterADesc) - require.NoError(t, err) - err = caggB.SynchronizedCopy(ckptB, &CounterBDesc) - require.NoError(t, err) - - // As yet cagg has not been passed to Integrator.Process. Should - // not see an update. - checkpointSet = b.CheckpointSet() - - records3 := test.NewOutput(label.DefaultEncoder()) - _ = checkpointSet.ForEach(records3.AddRecord) - - require.EqualValues(t, records1.Map, records3.Map) - b.StartCollection() - - // Now process the second update - _ = b.Process(export.NewAccumulation(&CounterADesc, Labels1, Resource, ckptA)) - _ = b.Process(export.NewAccumulation(&CounterBDesc, Labels1, Resource, ckptB)) - require.NoError(t, b.FinishCollection()) - - checkpointSet = b.CheckpointSet() - - records4 := test.NewOutput(label.DefaultEncoder()) - _ = checkpointSet.ForEach(records4.AddRecord) +func (bogusExporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind { + return 1000000 +} - require.EqualValues(t, map[string]float64{ - "a.sum/C=D,G=H/R=V": 30, - "b.sum/C=D,G=H/R=V": 30, - }, records4.Map) +func (bogusExporter) Export(context.Context, export.CheckpointSet) error { + panic("Not called") } func TestSimpleInconsistent(t *testing.T) { // Test double-start - b := simple.New(test.AggregationSelector(), true) + b := simple.New(test.AggregationSelector(), export.PassThroughExporter) b.StartCollection() b.StartCollection() require.Equal(t, simple.ErrInconsistentState, b.FinishCollection()) // Test finish without start - b = simple.New(test.AggregationSelector(), true) + b = simple.New(test.AggregationSelector(), export.PassThroughExporter) require.Equal(t, simple.ErrInconsistentState, b.FinishCollection()) // Test no finish - b = simple.New(test.AggregationSelector(), true) + b = simple.New(test.AggregationSelector(), export.PassThroughExporter) b.StartCollection() - require.Equal(t, simple.ErrInconsistentState, b.ForEach(func(export.Record) error { return nil })) + require.Equal( + t, + simple.ErrInconsistentState, + b.ForEach( + export.PassThroughExporter, + func(export.Record) error { return nil }, + ), + ) // Test no start - b = simple.New(test.AggregationSelector(), true) + b = simple.New(test.AggregationSelector(), export.PassThroughExporter) + + desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) + accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), exportTest.NoopAggregator{}) + require.Equal(t, simple.ErrInconsistentState, b.Process(accum)) + + // Test invalid kind: + b = simple.New(test.AggregationSelector(), export.PassThroughExporter) + b.StartCollection() + require.NoError(t, b.Process(accum)) + require.NoError(t, b.FinishCollection()) + + err := b.ForEach( + bogusExporter{}, + func(export.Record) error { return nil }, + ) + require.True(t, errors.Is(err, simple.ErrInvalidExporterKind)) - require.Equal(t, simple.ErrInconsistentState, b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10))) } func TestSimpleTimestamps(t *testing.T) { beforeNew := time.Now() - b := simple.New(test.AggregationSelector(), true) + b := simple.New(test.AggregationSelector(), export.PassThroughExporter) afterNew := time.Now() + desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) + accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), exportTest.NoopAggregator{}) + b.StartCollection() - _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10)) + _ = b.Process(accum) require.NoError(t, b.FinishCollection()) var start1, end1 time.Time - require.NoError(t, b.ForEach(func(rec export.Record) error { + require.NoError(t, b.ForEach(export.PassThroughExporter, func(rec export.Record) error { start1 = rec.StartTime() end1 = rec.EndTime() return nil @@ -277,12 +337,12 @@ func TestSimpleTimestamps(t *testing.T) { for i := 0; i < 2; i++ { b.StartCollection() - require.NoError(t, b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10))) + require.NoError(t, b.Process(accum)) require.NoError(t, b.FinishCollection()) var start2, end2 time.Time - require.NoError(t, b.ForEach(func(rec export.Record) error { + require.NoError(t, b.ForEach(export.PassThroughExporter, func(rec export.Record) error { start2 = rec.StartTime() end2 = rec.EndTime() return nil