From 2a3e0f07f40288d4e2fbf55d6f0b05e6a99976f8 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 15 Feb 2024 15:58:54 +0100 Subject: [PATCH] [processor/deltatocumulative]: Sums (#30707) **Description:** Implements major component functionality: - [x] metrics identification (`metrics.Ident`) and stream identification (`streams.Ident`) - [x] abstract data layer `data.Point` that keeps processing code data type (sum, histogram, exp histogram) agnostic - [x] `delta.Accumulator` stream processor for accumulating any `data.Point` **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30705 **Testing:** Done **Documentation:** TODO --- .chloggen/deltatocumulative-sums.yaml | 27 +++ .github/CODEOWNERS | 2 +- .../deltatocumulativeprocessor/README.md | 2 +- processor/deltatocumulativeprocessor/go.mod | 8 + processor/deltatocumulativeprocessor/go.sum | 9 + .../internal/data/add.go | 29 +++ .../internal/data/data.go | 76 +++++++ .../internal/delta/delta.go | 83 ++++++++ .../internal/delta/delta_test.go | 196 ++++++++++++++++++ .../internal/delta/lock.go | 25 +++ .../internal/metrics/data.go | 59 ++++++ .../internal/metrics/ident.go | 62 ++++++ .../internal/metrics/metrics.go | 54 +++++ .../internal/metrics/util.go | 25 +++ .../internal/streams/data.go | 54 +++++ .../internal/streams/data_test.go | 81 ++++++++ .../internal/streams/errors.go | 21 ++ .../internal/streams/streams.go | 35 ++++ .../internal/testdata/random/random.go | 74 +++++++ .../deltatocumulativeprocessor/metadata.yaml | 2 +- .../deltatocumulativeprocessor/processor.go | 31 +++ 21 files changed, 952 insertions(+), 3 deletions(-) create mode 100644 .chloggen/deltatocumulative-sums.yaml create mode 100644 processor/deltatocumulativeprocessor/internal/data/add.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/data.go create mode 100644 processor/deltatocumulativeprocessor/internal/delta/delta.go create mode 100644 processor/deltatocumulativeprocessor/internal/delta/delta_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/delta/lock.go create mode 100644 processor/deltatocumulativeprocessor/internal/metrics/data.go create mode 100644 processor/deltatocumulativeprocessor/internal/metrics/ident.go create mode 100644 processor/deltatocumulativeprocessor/internal/metrics/metrics.go create mode 100644 processor/deltatocumulativeprocessor/internal/metrics/util.go create mode 100644 processor/deltatocumulativeprocessor/internal/streams/data.go create mode 100644 processor/deltatocumulativeprocessor/internal/streams/data_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/streams/errors.go create mode 100644 processor/deltatocumulativeprocessor/internal/streams/streams.go create mode 100644 processor/deltatocumulativeprocessor/internal/testdata/random/random.go diff --git a/.chloggen/deltatocumulative-sums.yaml b/.chloggen/deltatocumulative-sums.yaml new file mode 100644 index 000000000000..07e2eef2bd80 --- /dev/null +++ b/.chloggen/deltatocumulative-sums.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulative + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: adds processor to convert sums (initially) from delta to cumulative temporality + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30705] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7731765027e7..3172505aa2c8 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -152,7 +152,7 @@ pkg/winperfcounters/ @open-telemetry/collect processor/attributesprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken processor/cumulativetodeltaprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth -processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez +processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez @RichieSams processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9 processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index e1dbb82f58f2..60432e3b593b 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -7,7 +7,7 @@ | Distributions | [] | | Warnings | [Statefulness](#warnings) | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdeltatocumulative%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdeltatocumulative%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez), [@RichieSams](https://www.github.com/RichieSams) | [development]: https://github.com/open-telemetry/opentelemetry-collector#development diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 77218cda4348..bf81f21f54f5 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta go 1.21 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.94.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.94.1 go.opentelemetry.io/collector/consumer v0.94.1 go.opentelemetry.io/collector/pdata v1.1.0 @@ -13,6 +15,8 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -25,6 +29,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.94.1 // indirect go.opentelemetry.io/collector/confmap v0.94.1 // indirect go.opentelemetry.io/otel v1.23.1 // indirect @@ -35,4 +40,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil diff --git a/processor/deltatocumulativeprocessor/go.sum b/processor/deltatocumulativeprocessor/go.sum index 0fead2f1da63..baf4b52491f3 100644 --- a/processor/deltatocumulativeprocessor/go.sum +++ b/processor/deltatocumulativeprocessor/go.sum @@ -30,6 +30,10 @@ github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPgh github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= github.com/knadh/koanf/v2 v2.0.2 h1:sEZzPW2rVWSahcYILNq/syJdEyRafZIG0l9aWwL86HA= github.com/knadh/koanf/v2 v2.0.2/go.mod h1:HN9uZ+qFAejH1e4G41gnoffIanINWQuONLXiV7kir6k= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c h1:cqn374mizHuIWj+OSJCajGr/phAmuMug9qIX3l9CflE= @@ -51,6 +55,8 @@ github.com/prometheus/common v0.46.0 h1:doXzt5ybi1HBKpsZOL0sSkaNHJJqkyfEWZGGqqSc github.com/prometheus/common v0.46.0/go.mod h1:Tp0qkxpb9Jsg54QMe+EAmqXkSV7Evdy1BTn+g2pa/hQ= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -126,5 +132,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processor/deltatocumulativeprocessor/internal/data/add.go b/processor/deltatocumulativeprocessor/internal/data/add.go new file mode 100644 index 000000000000..b40bf05b916d --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/add.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + +import "go.opentelemetry.io/collector/pdata/pmetric" + +func (dp Number) Add(in Number) Number { + switch in.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + v := dp.DoubleValue() + in.DoubleValue() + dp.SetDoubleValue(v) + case pmetric.NumberDataPointValueTypeInt: + v := dp.IntValue() + in.IntValue() + dp.SetIntValue(v) + } + dp.SetTimestamp(in.Timestamp()) + return dp +} + +// nolint +func (dp Histogram) Add(in Histogram) Histogram { + panic("todo") +} + +// nolint +func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { + panic("todo") +} diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go new file mode 100644 index 000000000000..941b3cff904f --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type Point[Self any] interface { + StartTimestamp() pcommon.Timestamp + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + + Clone() Self + CopyTo(Self) + + Add(Self) Self +} + +type Number struct { + pmetric.NumberDataPoint +} + +func (dp Number) Clone() Number { + clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()} + if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp Number) CopyTo(dst Number) { + dp.NumberDataPoint.CopyTo(dst.NumberDataPoint) +} + +type Histogram struct { + pmetric.HistogramDataPoint +} + +func (dp Histogram) Clone() Histogram { + clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()} + if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp Histogram) CopyTo(dst Histogram) { + dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint) +} + +type ExpHistogram struct { + pmetric.ExponentialHistogramDataPoint +} + +func (dp ExpHistogram) Clone() ExpHistogram { + clone := ExpHistogram{ExponentialHistogramDataPoint: pmetric.NewExponentialHistogramDataPoint()} + if dp.ExponentialHistogramDataPoint != (pmetric.ExponentialHistogramDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp ExpHistogram) CopyTo(dst ExpHistogram) { + dp.ExponentialHistogramDataPoint.CopyTo(dst.ExponentialHistogramDataPoint) +} + +type mustPoint[D Point[D]] struct{ _ D } + +var ( + _ = mustPoint[Number]{} + _ = mustPoint[Histogram]{} + _ = mustPoint[ExpHistogram]{} +) diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go new file mode 100644 index 000000000000..8246bf8e09d1 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package delta // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" +) + +func construct[D data.Point[D]]() streams.Aggregator[D] { + acc := &Accumulator[D]{dps: make(map[streams.Ident]D)} + return &Lock[D]{next: acc} +} + +func Numbers() streams.Aggregator[data.Number] { + return construct[data.Number]() +} + +func Histograms() streams.Aggregator[data.Histogram] { + return construct[data.Histogram]() +} + +var _ streams.Aggregator[data.Number] = (*Accumulator[data.Number])(nil) + +type Accumulator[D data.Point[D]] struct { + dps map[streams.Ident]D +} + +// Aggregate implements delta-to-cumulative aggregation as per spec: +// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative +func (a *Accumulator[D]) Aggregate(id streams.Ident, dp D) (D, error) { + // make the accumulator to start with the current sample, discarding any + // earlier data. return after use + reset := func() (D, error) { + a.dps[id] = dp.Clone() + return a.dps[id], nil + } + + aggr, ok := a.dps[id] + + // new series: reset + if !ok { + return reset() + } + // belongs to older series: drop + if dp.StartTimestamp() < aggr.StartTimestamp() { + return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} + } + // belongs to later series: reset + if dp.StartTimestamp() > aggr.StartTimestamp() { + return reset() + } + // out of order: drop + if dp.Timestamp() <= aggr.Timestamp() { + return aggr, ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} + } + + a.dps[id] = aggr.Add(dp) + return a.dps[id], nil +} + +type ErrOlderStart struct { + Start pcommon.Timestamp + Sample pcommon.Timestamp +} + +func (e ErrOlderStart) Error() string { + return fmt.Sprintf("dropped sample with start_time=%s, because series only starts at start_time=%s. consider checking for multiple processes sending the exact same series", e.Sample, e.Start) +} + +type ErrOutOfOrder struct { + Last pcommon.Timestamp + Sample pcommon.Timestamp +} + +func (e ErrOutOfOrder) Error() string { + return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last) +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go new file mode 100644 index 000000000000..ae863697339f --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go @@ -0,0 +1,196 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package delta_test + +import ( + "fmt" + "math/rand" + "strconv" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +var result any + +func BenchmarkAccumulator(b *testing.B) { + acc := delta.Numbers() + sum := random.Sum() + + bench := func(b *testing.B, nstreams int) { + nsamples := b.N / nstreams + + ids := make([]streams.Ident, nstreams) + dps := make([]data.Number, nstreams) + for i := 0; i < nstreams; i++ { + ids[i], dps[i] = sum.Stream() + } + + b.ResetTimer() + + var wg sync.WaitGroup + for i := 0; i < nstreams; i++ { + wg.Add(1) + go func(id streams.Ident, num data.Number) { + for n := 0; n < nsamples; n++ { + num.SetTimestamp(num.Timestamp() + 1) + val, err := acc.Aggregate(id, num) + if err != nil { + panic(err) + } + result = val + } + wg.Done() + }(ids[i], dps[i]) + } + + wg.Wait() + } + + nstreams := []int{1, 2, 10, 100, 1000} + for _, n := range nstreams { + b.Run(strconv.Itoa(n), func(b *testing.B) { + bench(b, n) + }) + } +} + +// verify the distinction between streams and the accumulated value +func TestAddition(t *testing.T) { + acc := delta.Numbers() + sum := random.Sum() + + type Idx int + type Stream struct { + idx Idx + id streams.Ident + dp data.Number + } + + streams := make([]Stream, 10) + for i := range streams { + id, dp := sum.Stream() + streams[i] = Stream{ + idx: Idx(i), + id: id, + dp: dp, + } + } + + want := make(map[Idx]int64) + for i := 0; i < 100; i++ { + stream := streams[rand.Intn(10)] + dp := stream.dp.Clone() + dp.SetTimestamp(dp.Timestamp() + pcommon.Timestamp(i)) + + val := int64(rand.Intn(255)) + dp.SetIntValue(val) + want[stream.idx] += val + + got, err := acc.Aggregate(stream.id, dp) + require.NoError(t, err) + + require.Equal(t, want[stream.idx], got.IntValue()) + } +} + +// verify that start + last times are updated +func TestTimes(t *testing.T) { + acc := delta.Numbers() + id, data := random.Sum().Stream() + + start := pcommon.Timestamp(1234) + ts1, ts2 := pcommon.Timestamp(1234), pcommon.Timestamp(1235) + + // first sample: take timestamps of point + first := data.Clone() + first.SetStartTimestamp(start) + first.SetTimestamp(ts1) + + r1, err := acc.Aggregate(id, first) + require.NoError(t, err) + require.Equal(t, start, r1.StartTimestamp()) + require.Equal(t, ts1, r1.Timestamp()) + + // second sample: take last of point, keep start + second := data.Clone() + second.SetStartTimestamp(start) + second.SetTimestamp(ts2) + + r2, err := acc.Aggregate(id, second) + require.NoError(t, err) + require.Equal(t, start, r2.StartTimestamp()) + require.Equal(t, ts2, r2.Timestamp()) +} + +func TestErrs(t *testing.T) { + type Point struct { + Start int + Time int + Value int + } + type Case struct { + Good Point + Bad Point + Err error + } + + cases := []Case{ + { + Good: Point{Start: 1234, Time: 1337, Value: 42}, + Bad: Point{Start: 1000, Time: 2000, Value: 24}, + Err: delta.ErrOlderStart{Start: time(1234), Sample: time(1000)}, + }, + { + Good: Point{Start: 1234, Time: 1337, Value: 42}, + Bad: Point{Start: 1234, Time: 1336, Value: 24}, + Err: delta.ErrOutOfOrder{Last: time(1337), Sample: time(1336)}, + }, + } + + for _, c := range cases { + c := c + t.Run(fmt.Sprintf("%T", c.Err), func(t *testing.T) { + acc := delta.Numbers() + id, data := random.Sum().Stream() + + good := data.Clone() + good.SetStartTimestamp(pcommon.Timestamp(c.Good.Start)) + good.SetTimestamp(pcommon.Timestamp(c.Good.Time)) + good.SetIntValue(int64(c.Good.Value)) + + r1, err := acc.Aggregate(id, good) + require.NoError(t, err) + + require.Equal(t, good.StartTimestamp(), r1.StartTimestamp()) + require.Equal(t, good.Timestamp(), r1.Timestamp()) + require.Equal(t, good.IntValue(), r1.IntValue()) + + bad := data.Clone() + bad.SetStartTimestamp(pcommon.Timestamp(c.Bad.Start)) + bad.SetTimestamp(pcommon.Timestamp(c.Bad.Time)) + bad.SetIntValue(int64(c.Bad.Value)) + + r2, err := acc.Aggregate(id, bad) + require.ErrorIs(t, err, c.Err) + + // sample must be dropped => no change + require.Equal(t, r1.StartTimestamp(), r2.StartTimestamp()) + require.Equal(t, r1.Timestamp(), r2.Timestamp()) + require.Equal(t, r1.IntValue(), r2.IntValue()) + }) + } + +} + +func time(ts int) pcommon.Timestamp { + return pcommon.Timestamp(ts) +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/lock.go b/processor/deltatocumulativeprocessor/internal/delta/lock.go new file mode 100644 index 000000000000..aeb8eb00d30f --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/delta/lock.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package delta // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + +import ( + "sync" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" +) + +var _ streams.Aggregator[data.Number] = (*Lock[data.Number])(nil) + +type Lock[D data.Point[D]] struct { + sync.Mutex + next streams.Aggregator[D] +} + +func (l *Lock[D]) Aggregate(id streams.Ident, dp D) (D, error) { + l.Lock() + dp, err := l.next.Aggregate(id, dp) + l.Unlock() + return dp, err +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go new file mode 100644 index 000000000000..c305c85d781e --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" +) + +type Data[D data.Point[D]] interface { + At(i int) D + Len() int + Ident() Ident +} + +type Sum Metric + +func (s Sum) At(i int) data.Number { + dp := Metric(s).Sum().DataPoints().At(i) + return data.Number{NumberDataPoint: dp} +} + +func (s Sum) Len() int { + return Metric(s).Sum().DataPoints().Len() +} + +func (s Sum) Ident() Ident { + return (*Metric)(&s).Ident() +} + +type Histogram Metric + +func (s Histogram) At(i int) data.Histogram { + dp := Metric(s).Histogram().DataPoints().At(i) + return data.Histogram{HistogramDataPoint: dp} +} + +func (s Histogram) Len() int { + return Metric(s).Histogram().DataPoints().Len() +} + +func (s Histogram) Ident() Ident { + return (*Metric)(&s).Ident() +} + +type ExpHistogram Metric + +func (s ExpHistogram) At(i int) data.ExpHistogram { + dp := Metric(s).ExponentialHistogram().DataPoints().At(i) + return data.ExpHistogram{ExponentialHistogramDataPoint: dp} +} + +func (s ExpHistogram) Len() int { + return Metric(s).ExponentialHistogram().DataPoints().Len() +} + +func (s ExpHistogram) Ident() Ident { + return (*Metric)(&s).Ident() +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/ident.go b/processor/deltatocumulativeprocessor/internal/metrics/ident.go new file mode 100644 index 000000000000..9076dc4918fa --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metrics/ident.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "hash" + "hash/fnv" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type Ident struct { + ScopeIdent + + name string + unit string + ty string + + monotonic bool + temporality pmetric.AggregationTemporality +} + +func (i Ident) Hash() hash.Hash64 { + sum := i.ScopeIdent.Hash() + sum.Write([]byte(i.name)) + sum.Write([]byte(i.unit)) + sum.Write([]byte(i.ty)) + + var mono byte + if i.monotonic { + mono = 1 + } + sum.Write([]byte{mono, byte(i.temporality)}) + return sum +} + +type ScopeIdent struct { + ResourceIdent + + name string + version string + attrs [16]byte +} + +func (s ScopeIdent) Hash() hash.Hash64 { + sum := s.ResourceIdent.Hash() + sum.Write([]byte(s.name)) + sum.Write([]byte(s.version)) + sum.Write(s.attrs[:]) + return sum +} + +type ResourceIdent struct { + attrs [16]byte +} + +func (r ResourceIdent) Hash() hash.Hash64 { + sum := fnv.New64a() + sum.Write(r.attrs[:]) + return sum +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go new file mode 100644 index 000000000000..53b1f42b4997 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +type Metric struct { + res pcommon.Resource + scope pcommon.InstrumentationScope + pmetric.Metric +} + +func (m *Metric) Ident() Ident { + id := Ident{ + ScopeIdent: ScopeIdent{ + ResourceIdent: ResourceIdent{ + attrs: pdatautil.MapHash(m.res.Attributes()), + }, + name: m.scope.Name(), + version: m.scope.Version(), + attrs: pdatautil.MapHash(m.scope.Attributes()), + }, + name: m.Metric.Name(), + unit: m.Metric.Unit(), + ty: m.Metric.Type().String(), + } + + switch m.Type() { + case pmetric.MetricTypeSum: + sum := m.Sum() + id.monotonic = sum.IsMonotonic() + id.temporality = sum.AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + exp := m.ExponentialHistogram() + id.monotonic = true + id.temporality = exp.AggregationTemporality() + case pmetric.MetricTypeHistogram: + hist := m.Histogram() + id.monotonic = true + id.temporality = hist.AggregationTemporality() + } + + return id +} + +func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { + return Metric{res: res, scope: scope, Metric: metric} +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/util.go b/processor/deltatocumulativeprocessor/internal/metrics/util.go new file mode 100644 index 000000000000..985716b3cc0f --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metrics/util.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import "go.opentelemetry.io/collector/pdata/pmetric" + +func Filter(metrics pmetric.Metrics, fn func(m Metric) bool) { + metrics.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + return !fn(From(rm.Resource(), sm.Scope(), m)) + }) + return false + }) + return false + }) +} + +func Each(metrics pmetric.Metrics, fn func(m Metric)) { + Filter(metrics, func(m Metric) bool { + fn(m) + return true + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go new file mode 100644 index 000000000000..f0f59356d56d --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" +) + +// Iterator as per https://go.dev/wiki/RangefuncExperiment +type Iter[V any] func(yield func(Ident, V) bool) + +// Samples returns an Iterator over each sample of all streams in the metric +func Samples[D data.Point[D]](m metrics.Data[D]) Iter[D] { + mid := m.Ident() + + return func(yield func(Ident, D) bool) { + for i := 0; i < m.Len(); i++ { + dp := m.At(i) + id := Identify(mid, dp.Attributes()) + if !yield(id, dp) { + break + } + } + } +} + +// Aggregate each point and replace it by the result +func Aggregate[D data.Point[D]](m metrics.Data[D], aggr Aggregator[D]) error { + var errs error + + // for id, dp := range Samples(m) + Samples(m)(func(id Ident, dp D) bool { + next, err := aggr.Aggregate(id, dp) + if err != nil { + errs = errors.Join(errs, Error(id, err)) + return true + } + next.CopyTo(dp) + return false + }) + + return errs +} + +func Identify(metric metrics.Ident, attrs pcommon.Map) Ident { + return Ident{metric: metric, attrs: pdatautil.MapHash(attrs)} +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/data_test.go b/processor/deltatocumulativeprocessor/internal/streams/data_test.go new file mode 100644 index 000000000000..4ea5a80e1f7a --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/data_test.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams_test + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +var rdp data.Number +var rid streams.Ident + +func BenchmarkSamples(b *testing.B) { + b.Run("iterfn", func(b *testing.B) { + dps := generate(b.N) + b.ResetTimer() + + streams.Samples[data.Number](dps)(func(id streams.Ident, dp data.Number) bool { + rdp = dp + rid = id + return true + }) + }) + + b.Run("iface", func(b *testing.B) { + dps := generate(b.N) + mid := dps.id.Metric() + b.ResetTimer() + + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + rid = streams.Identify(mid, dp.Attributes()) + rdp = dp + } + }) + + b.Run("loop", func(b *testing.B) { + dps := generate(b.N) + mid := dps.id.Metric() + b.ResetTimer() + + for i := range dps.dps { + dp := dps.dps[i] + rid = streams.Identify(mid, dp.Attributes()) + rdp = dp + } + }) +} + +func generate(n int) Data { + id, ndp := random.Sum().Stream() + dps := Data{id: id, dps: make([]data.Number, n)} + for i := range dps.dps { + dp := ndp.Clone() + dp.SetIntValue(int64(i)) + dps.dps[i] = dp + } + return dps +} + +type Data struct { + id streams.Ident + dps []data.Number +} + +func (l Data) At(i int) data.Number { + return l.dps[i] +} + +func (l Data) Len() int { + return len(l.dps) +} + +func (l Data) Ident() metrics.Ident { + return l.id.Metric() +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/errors.go b/processor/deltatocumulativeprocessor/internal/streams/errors.go new file mode 100644 index 000000000000..e69827a6212c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/errors.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "fmt" +) + +func Error(id Ident, err error) error { + return StreamErr{Ident: id, Err: err} +} + +type StreamErr struct { + Ident Ident + Err error +} + +func (e StreamErr) Error() string { + return fmt.Sprintf("%s: %s", e.Ident, e.Err) +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/streams.go b/processor/deltatocumulativeprocessor/internal/streams/streams.go new file mode 100644 index 000000000000..3cd99a760dd8 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/streams.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "hash" + "strconv" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" +) + +type Aggregator[D data.Point[D]] interface { + Aggregate(Ident, D) (D, error) +} + +type Ident struct { + metric metrics.Ident + attrs [16]byte +} + +func (i Ident) Hash() hash.Hash64 { + sum := i.metric.Hash() + sum.Write(i.attrs[:]) + return sum +} + +func (i Ident) String() string { + return strconv.FormatUint(i.Hash().Sum64(), 16) +} + +func (i Ident) Metric() metrics.Ident { + return i.metric +} diff --git a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go new file mode 100644 index 000000000000..e526ad08e28d --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package random + +import ( + "math" + "math/rand" + "strconv" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" +) + +func Sum() Metric { + metric := pmetric.NewMetric() + metric.SetEmptySum() + metric.SetName(randStr()) + metric.SetDescription(randStr()) + metric.SetUnit(randStr()) + return Metric{Metric: metrics.From(Resource(), Scope(), metric)} +} + +type Metric struct { + metrics.Metric +} + +func (m Metric) Stream() (streams.Ident, data.Number) { + dp := pmetric.NewNumberDataPoint() + dp.SetIntValue(int64(randInt())) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + for i := 0; i < 10; i++ { + dp.Attributes().PutStr(randStr(), randStr()) + } + id := streams.Identify(m.Ident(), dp.Attributes()) + + return id, data.Number{NumberDataPoint: dp} +} + +func Resource() pcommon.Resource { + res := pcommon.NewResource() + for i := 0; i < 10; i++ { + res.Attributes().PutStr(randStr(), randStr()) + } + return res +} + +func Scope() pcommon.InstrumentationScope { + scope := pcommon.NewInstrumentationScope() + scope.SetName(randStr()) + scope.SetVersion(randStr()) + for i := 0; i < 3; i++ { + scope.Attributes().PutStr(randStr(), randStr()) + } + return scope +} + +func randStr() string { + return strconv.FormatInt(randInt(), 16) +} + +func randInt() int64 { + return int64(rand.Intn(math.MaxInt16)) +} + +func randFloat() float64 { + return float64(randInt()) / float64(randInt()) +} diff --git a/processor/deltatocumulativeprocessor/metadata.yaml b/processor/deltatocumulativeprocessor/metadata.yaml index 398299bba51e..13f436fd056e 100644 --- a/processor/deltatocumulativeprocessor/metadata.yaml +++ b/processor/deltatocumulativeprocessor/metadata.yaml @@ -7,4 +7,4 @@ status: distributions: [] warnings: [Statefulness] codeowners: - active: [sh0rez] + active: [sh0rez, RichieSams] diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index dd0406181f19..057f3bcc3b37 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -5,12 +5,18 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" + "errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) var _ processor.Metrics = (*Processor)(nil) @@ -21,6 +27,8 @@ type Processor struct { log *zap.Logger ctx context.Context cancel context.CancelFunc + + nums streams.Aggregator[data.Number] } func newProcessor(_ *Config, log *zap.Logger, next consumer.Metrics) *Processor { @@ -31,6 +39,7 @@ func newProcessor(_ *Config, log *zap.Logger, next consumer.Metrics) *Processor ctx: ctx, cancel: cancel, next: next, + nums: delta.Numbers(), } return &proc @@ -50,5 +59,27 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + var errs error + + metrics.Each(md, func(m metrics.Metric) { + switch m.Type() { + case pmetric.MetricTypeSum: + sum := m.Sum() + if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Aggregate[data.Number](metrics.Sum(m), p.nums) + errs = errors.Join(errs, err) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } + case pmetric.MetricTypeHistogram: + // TODO + case pmetric.MetricTypeExponentialHistogram: + // TODO + } + }) + + if errs != nil { + return errs + } + return p.next.ConsumeMetrics(ctx, md) }