Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: Sums #30707

Merged
merged 32 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
55828ea
[processor/deltatocumulative]: initial structure
sh0rez Jan 22, 2024
c88bb65
deltatocumulative: scaffold
sh0rez Nov 28, 2023
a2da277
delta: sum, guard, metadata
sh0rez Nov 28, 2023
fd61533
identity: ids for metric + series
sh0rez Nov 28, 2023
ab085e6
deltatocumulative: basic processor
sh0rez Nov 28, 2023
862eb02
deltatocumulative: primitives, sum aggregation
sh0rez Dec 10, 2023
72dc5b8
deltatocumulative: emit on interval
sh0rez Dec 11, 2023
94f0d9a
deltatocumulative: factory plumbing
sh0rez Dec 11, 2023
84f27ab
delta: use max timestamp
sh0rez Jan 18, 2024
b244cdd
processor/deltatocumulative: in-line processing
sh0rez Jan 19, 2024
e702012
processor/deltatocumulative: set go mod to 1.20
sh0rez Jan 23, 2024
a111fc7
*: forward to deltatocumulative-structure
sh0rez Jan 23, 2024
5f51197
*: forward to main
sh0rez Jan 29, 2024
da680fc
[processor/deltatocumulative] autogenerate files
sh0rez Jan 29, 2024
b15e6db
[processor/deltatocumulative] linter fixes
sh0rez Jan 29, 2024
60bdfdf
*: review feedback
sh0rez Jan 29, 2024
d42d70c
*: forward to structure
sh0rez Jan 29, 2024
0fb12dc
*: linter fixes
sh0rez Jan 29, 2024
a1d06e9
*: forward to main
sh0rez Jan 29, 2024
bd365a6
deltatocumulative: some test, benchmarks
sh0rez Feb 2, 2024
a85dff4
*: merge main to resolve conflicts
sh0rez Feb 14, 2024
b9dccf4
*: make goporto
sh0rez Feb 14, 2024
529bc03
*: forward to main
sh0rez Feb 15, 2024
722c4c7
*: addlicense
sh0rez Feb 15, 2024
fa0e044
*: lint fixes
sh0rez Feb 15, 2024
1f3df5b
deltatocumulative: changelog entry
sh0rez Feb 15, 2024
154385a
*: go mod tidy
sh0rez Feb 15, 2024
2613df9
delta: spell out times in error msgs
sh0rez Feb 15, 2024
6f214f5
delta: simplify test, add action to err
sh0rez Feb 15, 2024
957581b
deltatocumulative: add @RichieSams as CODEOWNER
sh0rez Feb 15, 2024
adc35b3
delta: fix test
sh0rez Feb 15, 2024
dc2ddd9
deltatocumulative: re-gen using latest mdatagen
sh0rez Feb 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-sums.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulative

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: adds processor to convert sums (initially) from delta to cumulative temporality

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pkg/winperfcounters/ @open-telemetry/collect

processor/attributesprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken
processor/cumulativetodeltaprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth
processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez
processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez @RichieSams
processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo
Expand Down
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
| Distributions | [] |
| Warnings | [Statefulness](#warnings) |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdeltatocumulative%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdeltatocumulative%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez), [@RichieSams](https://www.github.com/RichieSams) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->
Expand Down
8 changes: 8 additions & 0 deletions processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.94.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.94.1
go.opentelemetry.io/collector/consumer v0.94.1
go.opentelemetry.io/collector/pdata v1.1.0
Expand All @@ -13,6 +15,8 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -25,6 +29,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.94.1 // indirect
go.opentelemetry.io/collector/confmap v0.94.1 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
Expand All @@ -35,4 +40,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
9 changes: 9 additions & 0 deletions processor/deltatocumulativeprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import "go.opentelemetry.io/collector/pdata/pmetric"

func (dp Number) Add(in Number) Number {
switch in.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
v := dp.DoubleValue() + in.DoubleValue()
dp.SetDoubleValue(v)
case pmetric.NumberDataPointValueTypeInt:
v := dp.IntValue() + in.IntValue()
dp.SetIntValue(v)
}
dp.SetTimestamp(in.Timestamp())
return dp
}

// nolint
func (dp Histogram) Add(in Histogram) Histogram {
panic("todo")
}

// nolint
func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
panic("todo")
}
76 changes: 76 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type Point[Self any] interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map

Clone() Self
CopyTo(Self)

Add(Self) Self
}

type Number struct {
pmetric.NumberDataPoint
}

func (dp Number) Clone() Number {
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Number) CopyTo(dst Number) {
dp.NumberDataPoint.CopyTo(dst.NumberDataPoint)
}

type Histogram struct {
pmetric.HistogramDataPoint
}

func (dp Histogram) Clone() Histogram {
clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()}
if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Histogram) CopyTo(dst Histogram) {
dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint)
}

type ExpHistogram struct {
pmetric.ExponentialHistogramDataPoint
}

func (dp ExpHistogram) Clone() ExpHistogram {
clone := ExpHistogram{ExponentialHistogramDataPoint: pmetric.NewExponentialHistogramDataPoint()}
if dp.ExponentialHistogramDataPoint != (pmetric.ExponentialHistogramDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp ExpHistogram) CopyTo(dst ExpHistogram) {
dp.ExponentialHistogramDataPoint.CopyTo(dst.ExponentialHistogramDataPoint)
}

type mustPoint[D Point[D]] struct{ _ D }

var (
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
_ = mustPoint[Number]{}
_ = mustPoint[Histogram]{}
_ = mustPoint[ExpHistogram]{}
)
83 changes: 83 additions & 0 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package delta // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
)

func construct[D data.Point[D]]() streams.Aggregator[D] {
acc := &Accumulator[D]{dps: make(map[streams.Ident]D)}
return &Lock[D]{next: acc}
}

func Numbers() streams.Aggregator[data.Number] {
return construct[data.Number]()
}

func Histograms() streams.Aggregator[data.Histogram] {
return construct[data.Histogram]()
}

var _ streams.Aggregator[data.Number] = (*Accumulator[data.Number])(nil)

type Accumulator[D data.Point[D]] struct {
dps map[streams.Ident]D
}

// Aggregate implements delta-to-cumulative aggregation as per spec:
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative
func (a *Accumulator[D]) Aggregate(id streams.Ident, dp D) (D, error) {
// make the accumulator to start with the current sample, discarding any
// earlier data. return after use
reset := func() (D, error) {
a.dps[id] = dp.Clone()
return a.dps[id], nil
}

aggr, ok := a.dps[id]

// new series: reset
if !ok {
return reset()
}
// belongs to older series: drop
if dp.StartTimestamp() < aggr.StartTimestamp() {
return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
}
// belongs to later series: reset
if dp.StartTimestamp() > aggr.StartTimestamp() {
return reset()
}
// out of order: drop
if dp.Timestamp() <= aggr.Timestamp() {
return aggr, ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

a.dps[id] = aggr.Add(dp)
return a.dps[id], nil
}

type ErrOlderStart struct {
Start pcommon.Timestamp
Sample pcommon.Timestamp
}

func (e ErrOlderStart) Error() string {
return fmt.Sprintf("dropped sample with start_time=%s, because series only starts at start_time=%s. consider checking for multiple processes sending the exact same series", e.Sample, e.Start)
}

type ErrOutOfOrder struct {
Last pcommon.Timestamp
Sample pcommon.Timestamp
}

func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}
Loading
Loading