diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 857bcf6c7d58..3d9920211f62 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -33,9 +33,17 @@ The following metric types will *not* be aggregated, and will instead be passed, The following settings can be optionally configured: -* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s -* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false -* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false +```yaml +intervalprocessor: + # The interval in which the processor should export the aggregated metrics. + [ interval: | default = 60s ] + + pass_through: + # Whether gauges should be aggregated or passed through to the next component as they are + [ gauge: | default = false ] + # Whether summaries should be aggregated or passed through to the next component as they are + [ summary: l | default = false ] +``` ## Example of metric flows diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index 96ad36189f80..ee98305cc3b5 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -20,12 +20,18 @@ var _ component.Config = (*Config)(nil) type Config struct { // Interval is the time interval at which the processor will aggregate metrics. Interval time.Duration `mapstructure:"interval"` - // GaugePassThrough is a flag that determines whether gauge metrics should be passed through + // PassThrough is a configuration that determines whether gauge and summary metrics should be passed through // as they are or aggregated. - GaugePassThrough bool `mapstructure:"gauge_pass_through"` - // SummaryPassThrough is a flag that determines whether summary metrics should be passed through + PassThrough PassThrough `mapstructure:"pass_through"` +} + +type PassThrough struct { + // Gauge is a flag that determines whether gauge metrics should be passed through + // as they are or aggregated. + Gauge bool `mapstructure:"gauge"` + // Summary is a flag that determines whether summary metrics should be passed through // as they are or aggregated. - SummaryPassThrough bool `mapstructure:"summary_pass_through"` + Summary bool `mapstructure:"summary"` } // Validate checks whether the input configuration has all of the required fields for the processor. diff --git a/processor/intervalprocessor/factory.go b/processor/intervalprocessor/factory.go index 981cc63f29a2..cfca47850b02 100644 --- a/processor/intervalprocessor/factory.go +++ b/processor/intervalprocessor/factory.go @@ -25,9 +25,11 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - Interval: 60 * time.Second, - GaugePassThrough: false, - SummaryPassThrough: false, + Interval: 60 * time.Second, + PassThrough: PassThrough{ + Gauge: false, + Summary: false, + }, } } diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index fa49a04211d8..5a9df9f4e0b0 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -38,9 +38,7 @@ type Processor struct { expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint summaryLookup map[identity.Stream]pmetric.SummaryDataPoint - exportInterval time.Duration - gaugePassThrough bool - summaryPassThrough bool + config *Config nextConsumer consumer.Metrics } @@ -64,16 +62,14 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}, summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{}, - exportInterval: config.Interval, - gaugePassThrough: config.GaugePassThrough, - summaryPassThrough: config.SummaryPassThrough, + config: config, nextConsumer: nextConsumer, } } func (p *Processor) Start(_ context.Context, _ component.Host) error { - exportTicker := time.NewTicker(p.exportInterval) + exportTicker := time.NewTicker(p.config.Interval) go func() { for { select { @@ -109,7 +105,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { switch m.Type() { case pmetric.MetricTypeSummary: - if p.summaryPassThrough { + if p.config.PassThrough.Summary { return false } @@ -117,7 +113,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup) return true case pmetric.MetricTypeGauge: - if p.gaugePassThrough { + if p.config.PassThrough.Gauge { return false } diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index cda18e561b5d..69bfc715018f 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -41,7 +41,7 @@ func TestAggregation(t *testing.T) { var config *Config for _, tc := range testCases { - config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough} + config = &Config{Interval: time.Second, PassThrough: PassThrough{Gauge: tc.passThrough, Summary: tc.passThrough}} t.Run(tc.name, func(t *testing.T) { // next stores the results of the filter metric processor