diff --git a/.chloggen/interval-implement.yaml b/.chloggen/interval-implement.yaml new file mode 100644 index 000000000000..911e53ea436d --- /dev/null +++ b/.chloggen/interval-implement.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "new_component" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: intervalprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implements the new interval processor. See the README for more info about how to use it + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29461] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2c02709217de..e22845a7af66 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -163,7 +163,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling -processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams +processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9 diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index c6b25e6aa3c5..829556e3e9ed 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -226,6 +226,7 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal - github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs => ../../internal/aws/cwlogs - github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common + - github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => ../../receiver/awsxrayreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver => ../../receiver/azureblobreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver => ../../receiver/k8sobjectsreceiver diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index e2d00b78c581..27953cabcf3f 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -555,6 +555,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.99.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.99.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.99.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.99.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.99.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.99.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.99.0 // indirect @@ -753,6 +754,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/c replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => ../../receiver/awsxrayreceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver => ../../receiver/azureblobreceiver diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 4bf25c198920..ce69321cadd9 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -93,3 +93,7 @@ func (s *Staleness[T]) Evict() identity.Stream { s.items.Delete(id) return id } + +func (s *Staleness[T]) Clear() { + s.items.Clear() +} diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 90bebb63c091..03fba7812a43 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -16,6 +16,7 @@ type Map[T any] interface { Delete(identity.Stream) Items() func(yield func(identity.Stream, T) bool) bool Len() int + Clear() } var _ Map[any] = HashMap[any](nil) @@ -51,6 +52,10 @@ func (m HashMap[T]) Len() int { return len((map[identity.Stream]T)(m)) } +func (m HashMap[T]) Clear() { + clear(m) +} + // Evictors remove the "least important" stream based on some strategy such as // the oldest, least active, etc. type Evictor interface { diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 90cf86d05e3f..484f1723c9c0 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -7,7 +7,7 @@ | Distributions | [] | | Warnings | [Statefulness](#warnings) | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Finterval%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Finterval) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Finterval%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Finterval) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams), [@sh0rez](https://www.github.com/sh0rez) | [development]: https://github.com/open-telemetry/opentelemetry-collector#development @@ -31,4 +31,35 @@ The following metric types will *not* be aggregated, and will instead be passed, The following settings can be optionally configured: -- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 +* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s + +## Example of metric flows + +The following sum metrics come into the processor to be handled + +| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value | +| --------- | ------------ | ------------------------- | ----------------- | ----: | +| 0 | test_metric | Cumulative | labelA: foo | 4.0 | +| 2 | test_metric | Cumulative | labelA: bar | 3.1 | +| 4 | other_metric | Delta | fruitType: orange | 77.4 | +| 6 | test_metric | Cumulative | labelA: foo | 8.2 | +| 8 | test_metric | Cumulative | labelA: foo | 12.8 | +| 10 | test_metric | Cumulative | labelA: bar | 6.4 | + +The processor would immediately pass the following metrics to the next processor in the chain + +| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value | +| --------- | ------------ | ------------------------- | ----------------- | ----: | +| 4 | other_metric | Delta | fruitType: orange | 77.4 | + +Because it's a Delta metric. + +At the next `interval` (15s by default), the processor would pass the following metrics to the next processor in the chain + +| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value | +| --------- | ----------- | ------------------------- | ----------- | ----: | +| 8 | test_metric | Cumulative | labelA: foo | 12.8 | +| 10 | test_metric | Cumulative | labelA: bar | 6.4 | + +> [!IMPORTANT] +> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing. diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index 8a43eca75c00..1967afc972bb 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -4,21 +4,30 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" import ( + "errors" "time" "go.opentelemetry.io/collector/component" ) +var ( + ErrInvalidIntervalValue = errors.New("invalid interval value") +) + var _ component.Config = (*Config)(nil) // Config defines the configuration for the processor. type Config struct { - // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. - MaxStaleness time.Duration `mapstructure:"max_staleness"` + // Interval is the time + Interval time.Duration `mapstructure:"interval"` } // Validate checks whether the input configuration has all of the required fields for the processor. // An error is returned if there are any invalid inputs. func (config *Config) Validate() error { + if config.Interval <= 0 { + return ErrInvalidIntervalValue + } + return nil } diff --git a/processor/intervalprocessor/factory.go b/processor/intervalprocessor/factory.go index 5bef7881dfdf..91127dcaa119 100644 --- a/processor/intervalprocessor/factory.go +++ b/processor/intervalprocessor/factory.go @@ -6,6 +6,7 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" "fmt" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,7 +24,9 @@ func NewFactory() processor.Factory { } func createDefaultConfig() component.Config { - return &Config{} + return &Config{ + Interval: 60 * time.Second, + } } func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) { diff --git a/processor/intervalprocessor/go.mod b/processor/intervalprocessor/go.mod index fc20ff9eac81..1c559023b6f9 100644 --- a/processor/intervalprocessor/go.mod +++ b/processor/intervalprocessor/go.mod @@ -3,6 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/inter go 1.21.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.99.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.99.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.99.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.99.0 go.opentelemetry.io/collector/confmap v0.99.0 @@ -17,7 +20,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -32,6 +35,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -52,3 +56,11 @@ require ( google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/intervalprocessor/go.sum b/processor/intervalprocessor/go.sum index 6470d0da5ba9..682b8383315d 100644 --- a/processor/intervalprocessor/go.sum +++ b/processor/intervalprocessor/go.sum @@ -1,7 +1,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/processor/intervalprocessor/internal/metrics/metrics.go b/processor/intervalprocessor/internal/metrics/metrics.go new file mode 100644 index 000000000000..c3febf1a173a --- /dev/null +++ b/processor/intervalprocessor/internal/metrics/metrics.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type DataPointSlice[DP DataPoint[DP]] interface { + Len() int + At(i int) DP + AppendEmpty() DP +} + +type DataPoint[Self any] interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint + + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + CopyTo(dest Self) +} diff --git a/processor/intervalprocessor/metadata.yaml b/processor/intervalprocessor/metadata.yaml index 5bfb1ec220be..4800f3d7316d 100644 --- a/processor/intervalprocessor/metadata.yaml +++ b/processor/intervalprocessor/metadata.yaml @@ -8,6 +8,6 @@ status: distributions: [] warnings: [Statefulness] codeowners: - active: [RichieSams] + active: [RichieSams, sh0rez] tests: config: diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 1ba888a1f7b4..6960472e5395 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -5,6 +5,9 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "errors" + "fmt" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -12,6 +15,9 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics" ) var _ processor.Metrics = (*Processor)(nil) @@ -19,9 +25,19 @@ var _ processor.Metrics = (*Processor)(nil) type Processor struct { ctx context.Context cancel context.CancelFunc - log *zap.Logger + logger *zap.Logger + + stateLock sync.Mutex - maxStaleness time.Duration + md pmetric.Metrics + rmLookup map[identity.Resource]pmetric.ResourceMetrics + smLookup map[identity.Scope]pmetric.ScopeMetrics + mLookup map[identity.Metric]pmetric.Metric + numberLookup map[identity.Stream]pmetric.NumberDataPoint + histogramLookup map[identity.Stream]pmetric.HistogramDataPoint + expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint + + exportInterval time.Duration nextConsumer consumer.Metrics } @@ -30,15 +46,40 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics ctx, cancel := context.WithCancel(context.Background()) return &Processor{ - ctx: ctx, - cancel: cancel, - log: log, - maxStaleness: config.MaxStaleness, + ctx: ctx, + cancel: cancel, + logger: log, + + stateLock: sync.Mutex{}, + + md: pmetric.NewMetrics(), + rmLookup: map[identity.Resource]pmetric.ResourceMetrics{}, + smLookup: map[identity.Scope]pmetric.ScopeMetrics{}, + mLookup: map[identity.Metric]pmetric.Metric{}, + numberLookup: map[identity.Stream]pmetric.NumberDataPoint{}, + histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{}, + expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}, + + exportInterval: config.Interval, + nextConsumer: nextConsumer, } } func (p *Processor) Start(_ context.Context, _ component.Host) error { + exportTicker := time.NewTicker(p.exportInterval) + go func() { + for { + select { + case <-p.ctx.Done(): + exportTicker.Stop() + return + case <-exportTicker.C: + p.exportMetrics() + } + } + }() + return nil } @@ -52,5 +93,182 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - return p.nextConsumer.ConsumeMetrics(ctx, md) + var errs error + + p.stateLock.Lock() + defer p.stateLock.Unlock() + + md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + switch m.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return false + case pmetric.MetricTypeSum: + // Check if we care about this value + sum := m.Sum() + + if !sum.IsMonotonic() { + return false + } + + if sum.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + cloneSum := mClone.Sum() + + aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.numberLookup) + return true + case pmetric.MetricTypeHistogram: + histogram := m.Histogram() + + if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + cloneHistogram := mClone.Histogram() + + aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.histogramLookup) + return true + case pmetric.MetricTypeExponentialHistogram: + expHistogram := m.ExponentialHistogram() + + if expHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + cloneExpHistogram := mClone.ExponentialHistogram() + + aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.expHistogramLookup) + return true + default: + errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type())) + return false + } + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) + + if err := p.nextConsumer.ConsumeMetrics(ctx, md); err != nil { + errs = errors.Join(errs, err) + } + + return errs +} + +func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP]](dataPoints DPS, mCloneDataPoints DPS, metricID identity.Metric, dpLookup map[identity.Stream]DP) { + for i := 0; i < dataPoints.Len(); i++ { + dp := dataPoints.At(i) + + streamID := identity.OfStream(metricID, dp) + existingDP, ok := dpLookup[streamID] + if !ok { + dpClone := mCloneDataPoints.AppendEmpty() + dp.CopyTo(dpClone) + dpLookup[streamID] = dpClone + continue + } + + // Check if the datapoint is newer + if dp.Timestamp() > existingDP.Timestamp() { + dp.CopyTo(existingDP) + continue + } + + // Otherwise, we leave existing as-is + } +} + +func (p *Processor) exportMetrics() { + md := func() pmetric.Metrics { + p.stateLock.Lock() + defer p.stateLock.Unlock() + + // ConsumeMetrics() has prepared our own pmetric.Metrics instance ready for us to use + // Take it and clear replace it with a new empty one + out := p.md + p.md = pmetric.NewMetrics() + + // Clear all the lookup references + clear(p.rmLookup) + clear(p.smLookup) + clear(p.mLookup) + clear(p.numberLookup) + clear(p.histogramLookup) + clear(p.expHistogramLookup) + + return out + }() + + if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil { + p.logger.Error("Metrics export failed", zap.Error(err)) + } +} + +func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric) { + // Find the ResourceMetrics + resID := identity.OfResource(rm.Resource()) + rmClone, ok := p.rmLookup[resID] + if !ok { + // We need to clone it *without* the ScopeMetricsSlice data + rmClone = p.md.ResourceMetrics().AppendEmpty() + rm.Resource().CopyTo(rmClone.Resource()) + rmClone.SetSchemaUrl(rm.SchemaUrl()) + p.rmLookup[resID] = rmClone + } + + // Find the ScopeMetrics + scopeID := identity.OfScope(resID, sm.Scope()) + smClone, ok := p.smLookup[scopeID] + if !ok { + // We need to clone it *without* the MetricSlice data + smClone = rmClone.ScopeMetrics().AppendEmpty() + sm.Scope().CopyTo(smClone.Scope()) + smClone.SetSchemaUrl(sm.SchemaUrl()) + p.smLookup[scopeID] = smClone + } + + // Find the Metric + metricID := identity.OfMetric(scopeID, m) + mClone, ok := p.mLookup[metricID] + if !ok { + // We need to clone it *without* the datapoint data + mClone = smClone.Metrics().AppendEmpty() + mClone.SetName(m.Name()) + mClone.SetDescription(m.Description()) + mClone.SetUnit(m.Unit()) + + switch m.Type() { + case pmetric.MetricTypeGauge: + mClone.SetEmptyGauge() + case pmetric.MetricTypeSummary: + mClone.SetEmptySummary() + case pmetric.MetricTypeSum: + src := m.Sum() + + dest := mClone.SetEmptySum() + dest.SetAggregationTemporality(src.AggregationTemporality()) + dest.SetIsMonotonic(src.IsMonotonic()) + case pmetric.MetricTypeHistogram: + src := m.Histogram() + + dest := mClone.SetEmptyHistogram() + dest.SetAggregationTemporality(src.AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + src := m.ExponentialHistogram() + + dest := mClone.SetEmptyExponentialHistogram() + dest.SetAggregationTemporality(src.AggregationTemporality()) + } + + p.mLookup[metricID] = mClone + } + + return mClone, metricID } diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go new file mode 100644 index 000000000000..0515ef631e07 --- /dev/null +++ b/processor/intervalprocessor/processor_test.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" +) + +func TestAggregation(t *testing.T) { + t.Parallel() + + testCases := []string{ + "basic_aggregation", + "non_monotonic_sums_are_passed_through", + "summaries_are_passed_through", + "histograms_are_aggregated", + "exp_histograms_are_aggregated", + "all_delta_metrics_are_passed_through", + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := &Config{Interval: time.Second} + + for _, tc := range testCases { + testName := tc + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + // next stores the results of the filter metric processor + next := &consumertest.MetricsSink{} + + factory := NewFactory() + mgp, err := factory.CreateMetricsProcessor( + context.Background(), + processortest.NewNopCreateSettings(), + config, + next, + ) + require.NoError(t, err) + + dir := filepath.Join("testdata", testName) + + md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) + require.NoError(t, err) + + // Test that ConsumeMetrics works + err = mgp.ConsumeMetrics(ctx, md) + require.NoError(t, err) + + require.IsType(t, &Processor{}, mgp) + processor := mgp.(*Processor) + + // Pretend we hit the interval timer and call export + processor.exportMetrics() + + // All the lookup tables should now be empty + require.Empty(t, processor.rmLookup) + require.Empty(t, processor.smLookup) + require.Empty(t, processor.mLookup) + require.Empty(t, processor.numberLookup) + require.Empty(t, processor.histogramLookup) + require.Empty(t, processor.expHistogramLookup) + + // Exporting again should return nothing + processor.exportMetrics() + + // Next should have gotten three data sets: + // 1. Anything left over from ConsumeMetrics() + // 2. Anything exported from exportMetrics() + // 3. An empty entry for the second call to exportMetrics() + allMetrics := next.AllMetrics() + require.Len(t, allMetrics, 3) + + nextData := allMetrics[0] + exportData := allMetrics[1] + secondExportData := allMetrics[2] + + expectedNextData, err := golden.ReadMetrics(filepath.Join(dir, "next.yaml")) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expectedNextData, nextData)) + + expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expectedExportData, exportData)) + + require.NoError(t, pmetrictest.CompareMetrics(pmetric.NewMetrics(), secondExportData), "the second export data should be empty") + }) + } +} diff --git a/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/input.yaml new file mode 100644 index 000000000000..810cd9bf48da --- /dev/null +++ b/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/input.yaml @@ -0,0 +1,56 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: delta.monotonic.sum + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.histogram.test + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 80 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [9, 12, 17, 8, 34] + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.exphistogram.test + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 80 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [9, 12, 17, 8, 34] + negative: + offset: 6 + bucketCounts: [6, 21, 9, 19, 7] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/next.yaml new file mode 100644 index 000000000000..810cd9bf48da --- /dev/null +++ b/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/next.yaml @@ -0,0 +1,56 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: delta.monotonic.sum + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.histogram.test + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 80 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [9, 12, 17, 8, 34] + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.exphistogram.test + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 80 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [9, 12, 17, 8, 34] + negative: + offset: 6 + bucketCounts: [6, 21, 9, 19, 7] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/output.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/all_delta_metrics_are_passed_through/output.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/intervalprocessor/testdata/basic_aggregation/input.yaml b/processor/intervalprocessor/testdata/basic_aggregation/input.yaml new file mode 100644 index 000000000000..82393b6a9d76 --- /dev/null +++ b/processor/intervalprocessor/testdata/basic_aggregation/input.yaml @@ -0,0 +1,43 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # This data point is out of order + # The aggregator should ignore it since the first data point has a newer timestamp + - timeUnixNano: 20 + asDouble: 222 + attributes: + - key: aaa + value: + stringValue: bbb + # This one is the newest, so it should be the one stored + - timeUnixNano: 80 + asDouble: 444 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/basic_aggregation/next.yaml b/processor/intervalprocessor/testdata/basic_aggregation/next.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/basic_aggregation/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/intervalprocessor/testdata/basic_aggregation/output.yaml b/processor/intervalprocessor/testdata/basic_aggregation/output.yaml new file mode 100644 index 000000000000..8f8f9df3a7fc --- /dev/null +++ b/processor/intervalprocessor/testdata/basic_aggregation/output.yaml @@ -0,0 +1,28 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 80 + asDouble: 444 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/input.yaml b/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/input.yaml new file mode 100644 index 000000000000..ccbc4815195d --- /dev/null +++ b/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/input.yaml @@ -0,0 +1,63 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.exphistogram.test + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + attributes: + - key: aaa + value: + stringValue: bbb + # This data point is out of order + # The aggregator should ignore it since the first data point has a newer timestamp + - timeUnixNano: 20 + scale: 4 + zeroCount: 2 + positive: + Offset: 2 + BucketCounts: [2, 3, 7, 4, 20] + negative: + offset: 7 + bucketCounts: [8, 3, 9, 1] + attributes: + - key: aaa + value: + stringValue: bbb + # This one is the newest, so it should be the one stored + - timeUnixNano: 80 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [9, 12, 17, 8, 34] + negative: + offset: 6 + bucketCounts: [6, 21, 9, 19, 7] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/next.yaml b/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/next.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/output.yaml b/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/output.yaml new file mode 100644 index 000000000000..3973030fef16 --- /dev/null +++ b/processor/intervalprocessor/testdata/exp_histograms_are_aggregated/output.yaml @@ -0,0 +1,34 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.exphistogram.test + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 80 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [9, 12, 17, 8, 34] + negative: + offset: 6 + bucketCounts: [6, 21, 9, 19, 7] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml new file mode 100644 index 000000000000..a3d65c2986e0 --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml @@ -0,0 +1,39 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: test.gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + asDouble: 258 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + asDouble: 178 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml new file mode 100644 index 000000000000..a3d65c2986e0 --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml @@ -0,0 +1,39 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: test.gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + asDouble: 258 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + asDouble: 178 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/intervalprocessor/testdata/histograms_are_aggregated/input.yaml b/processor/intervalprocessor/testdata/histograms_are_aggregated/input.yaml new file mode 100644 index 000000000000..860bd22d1c11 --- /dev/null +++ b/processor/intervalprocessor/testdata/histograms_are_aggregated/input.yaml @@ -0,0 +1,45 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.histogram.test + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [4, 7, 9, 6, 25] + attributes: + - key: aaa + value: + stringValue: bbb + # This data point is out of order + # The aggregator should ignore it since the first data point has a newer timestamp + - timeUnixNano: 20 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [2, 3, 7, 4, 20] + attributes: + - key: aaa + value: + stringValue: bbb + # This one is the newest, so it should be the one stored + - timeUnixNano: 80 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [9, 12, 17, 8, 34] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/histograms_are_aggregated/next.yaml b/processor/intervalprocessor/testdata/histograms_are_aggregated/next.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/histograms_are_aggregated/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/intervalprocessor/testdata/histograms_are_aggregated/output.yaml b/processor/intervalprocessor/testdata/histograms_are_aggregated/output.yaml new file mode 100644 index 000000000000..682e14170623 --- /dev/null +++ b/processor/intervalprocessor/testdata/histograms_are_aggregated/output.yaml @@ -0,0 +1,28 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.histogram.test + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 80 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [9, 12, 17, 8, 34] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/input.yaml new file mode 100644 index 000000000000..c6c4b82fdd20 --- /dev/null +++ b/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/input.yaml @@ -0,0 +1,40 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.nonmonotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: false + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + asDouble: 222 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + asDouble: 111 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/next.yaml new file mode 100644 index 000000000000..9e07f2405de2 --- /dev/null +++ b/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/next.yaml @@ -0,0 +1,43 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.nonmonotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: false + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # This data point is out of order + # The aggregator should ignore it since the first data point has a newer timestamp + - timeUnixNano: 20 + asDouble: 222 + attributes: + - key: aaa + value: + stringValue: bbb + # This one is the newest, so it should be the one stored + - timeUnixNano: 80 + asDouble: 111 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/output.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/non_monotonic_sums_are_passed_through/output.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml new file mode 100644 index 000000000000..15862ceb73e8 --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml @@ -0,0 +1,62 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: summary.test + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + quantileValues: + - quantile: 0.25 + value: 40 + - quantile: 0.5 + value: 10 + - quantile: 0.75 + value: 60 + - quantile: 0.95 + value: 5 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + quantileValues: + - quantile: 0.25 + value: 80 + - quantile: 0.5 + value: 35 + - quantile: 0.75 + value: 90 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml new file mode 100644 index 000000000000..15862ceb73e8 --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml @@ -0,0 +1,62 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: summary.test + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + quantileValues: + - quantile: 0.25 + value: 40 + - quantile: 0.5 + value: 10 + - quantile: 0.75 + value: 60 + - quantile: 0.95 + value: 5 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + quantileValues: + - quantile: 0.25 + value: 80 + - quantile: 0.5 + value: 35 + - quantile: 0.75 + value: 90 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml new file mode 100644 index 000000000000..3949e7c54ded --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml @@ -0,0 +1 @@ +resourceMetrics: []