Skip to content

Commit

Permalink
[processor/interval] Implement the main logic (#32054)
Browse files Browse the repository at this point in the history
Description:

This PR implements the main logic of this new processor.

Link to tracking Issue:

#29461
This is a re-opening of
[30827](#30827)

Testing:
I added a test for the main aggregation / export behavior, but I need to
add more to test state expiry

Documentation:

I updated the README with the updated configs, but I should add some
examples as well.
  • Loading branch information
RichieSams authored Apr 30, 2024
1 parent b47f5e7 commit 4f96a92
Show file tree
Hide file tree
Showing 36 changed files with 1,102 additions and 17 deletions.
27 changes: 27 additions & 0 deletions .chloggen/interval-implement.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "new_component"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: intervalprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implements the new interval processor. See the README for more info about how to use it

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29461]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo
processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth
processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa
processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs => ../../internal/aws/cwlogs
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => ../../receiver/awsxrayreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver => ../../receiver/azureblobreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver => ../../receiver/k8sobjectsreceiver
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.99.0 // indirect
Expand Down Expand Up @@ -753,6 +754,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/c

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => ../../receiver/awsxrayreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver => ../../receiver/azureblobreceiver
Expand Down
4 changes: 4 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ func (s *Staleness[T]) Evict() identity.Stream {
s.items.Delete(id)
return id
}

func (s *Staleness[T]) Clear() {
s.items.Clear()
}
5 changes: 5 additions & 0 deletions internal/exp/metrics/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Map[T any] interface {
Delete(identity.Stream)
Items() func(yield func(identity.Stream, T) bool) bool
Len() int
Clear()
}

var _ Map[any] = HashMap[any](nil)
Expand Down Expand Up @@ -51,6 +52,10 @@ func (m HashMap[T]) Len() int {
return len((map[identity.Stream]T)(m))
}

func (m HashMap[T]) Clear() {
clear(m)
}

// Evictors remove the "least important" stream based on some strategy such as
// the oldest, least active, etc.
type Evictor interface {
Expand Down
35 changes: 33 additions & 2 deletions processor/intervalprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
| Distributions | [] |
| Warnings | [Statefulness](#warnings) |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Finterval%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Finterval) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Finterval%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Finterval) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams), [@sh0rez](https://www.github.com/sh0rez) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->
Expand All @@ -31,4 +31,35 @@ The following metric types will *not* be aggregated, and will instead be passed,

The following settings can be optionally configured:

- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0
* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s

## Example of metric flows

The following sum metrics come into the processor to be handled

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 0 | test_metric | Cumulative | labelA: foo | 4.0 |
| 2 | test_metric | Cumulative | labelA: bar | 3.1 |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |
| 6 | test_metric | Cumulative | labelA: foo | 8.2 |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

The processor would immediately pass the following metrics to the next processor in the chain

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |

Because it's a Delta metric.

At the next `interval` (15s by default), the processor would pass the following metrics to the next processor in the chain

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ----------- | ------------------------- | ----------- | ----: |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

> [!IMPORTANT]
> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing.
13 changes: 11 additions & 2 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"

import (
"errors"
"time"

"go.opentelemetry.io/collector/component"
)

var (
ErrInvalidIntervalValue = errors.New("invalid interval value")
)

var _ component.Config = (*Config)(nil)

// Config defines the configuration for the processor.
type Config struct {
// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
MaxStaleness time.Duration `mapstructure:"max_staleness"`
// Interval is the time
Interval time.Duration `mapstructure:"interval"`
}

// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if config.Interval <= 0 {
return ErrInvalidIntervalValue
}

return nil
}
5 changes: 4 additions & 1 deletion processor/intervalprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -23,7 +24,9 @@ func NewFactory() processor.Factory {
}

func createDefaultConfig() component.Config {
return &Config{}
return &Config{
Interval: 60 * time.Second,
}
}

func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) {
Expand Down
14 changes: 13 additions & 1 deletion processor/intervalprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/inter
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.99.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.99.0
go.opentelemetry.io/collector/confmap v0.99.0
Expand All @@ -17,7 +20,7 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -32,6 +35,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand All @@ -52,3 +56,11 @@ require (
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
4 changes: 2 additions & 2 deletions processor/intervalprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions processor/intervalprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type DataPointSlice[DP DataPoint[DP]] interface {
Len() int
At(i int) DP
AppendEmpty() DP
}

type DataPoint[Self any] interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
CopyTo(dest Self)
}
2 changes: 1 addition & 1 deletion processor/intervalprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ status:
distributions: []
warnings: [Statefulness]
codeowners:
active: [RichieSams]
active: [RichieSams, sh0rez]
tests:
config:
Loading

0 comments on commit 4f96a92

Please sign in to comment.