From c6a6bd43875769d7400e8462e63730fe1a49ab11 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 14 May 2024 13:20:50 +0200 Subject: [PATCH 001/258] deltatocumulative: exponential histograms (#32030) **Description:** Implements accumulation of exponential histograms by adding bucket-per-bucket. - [x] Align bucket offset to the smaller one - [x] Merge buckets by adding up each buckets count - [x] Widen zero buckets so they are the same - [x] Adjust scale to the lowest one **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30705 **Testing:** Extensive tests have been added to the `internal/data` package **Documentation:** not needed --- .chloggen/deltatocumulative-exphist.yaml | 29 ++++ processor/deltatocumulativeprocessor/go.mod | 5 + .../internal/data/add.go | 51 ++++++- .../internal/data/data.go | 10 +- .../internal/data/expo/expo.go | 52 +++++++ .../internal/data/expo/expo_test.go | 63 +++++++++ .../internal/data/expo/expotest/bins.go | 81 +++++++++++ .../internal/data/expo/expotest/equal.go | 115 +++++++++++++++ .../internal/data/expo/expotest/equal_test.go | 73 ++++++++++ .../internal/data/expo/expotest/histogram.go | 65 +++++++++ .../internal/data/expo/merge.go | 37 +++++ .../internal/data/expo/merge_test.go | 53 +++++++ .../internal/data/expo/ord.go | 16 +++ .../internal/data/expo/ord_test.go | 40 ++++++ .../internal/data/expo/scale.go | 115 +++++++++++++++ .../internal/data/expo/scale_test.go | 90 ++++++++++++ .../internal/data/expo/zero.go | 68 +++++++++ .../internal/data/expo/zero_test.go | 125 +++++++++++++++++ .../internal/data/expo_test.go | 131 ++++++++++++++++++ .../internal/metrics/data.go | 2 +- .../internal/telemetry/metrics.go | 15 +- .../deltatocumulativeprocessor/processor.go | 59 +++++--- 22 files changed, 1263 insertions(+), 32 deletions(-) create mode 100644 .chloggen/deltatocumulative-exphist.yaml create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expo.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/merge.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/ord.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/scale.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/zero.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/data/expo_test.go diff --git a/.chloggen/deltatocumulative-exphist.yaml b/.chloggen/deltatocumulative-exphist.yaml new file mode 100644 index 000000000000..7dfa30bf54e4 --- /dev/null +++ b/.chloggen/deltatocumulative-exphist.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: exponential histogram accumulation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31340] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + accumulates exponential histogram datapoints by adding respective bucket counts. + also handles downscaling, changing zero-counts, offset adaptions and optional fields + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 859bfc1eb028..a49a75e803b6 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.100.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.100.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.100.0 go.opentelemetry.io/collector/confmap v0.100.0 @@ -58,4 +59,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/deltatocumulativeprocessor/internal/data/add.go b/processor/deltatocumulativeprocessor/internal/data/add.go index b40bf05b916d..94a575b1bd9f 100644 --- a/processor/deltatocumulativeprocessor/internal/data/add.go +++ b/processor/deltatocumulativeprocessor/internal/data/add.go @@ -3,7 +3,13 @@ package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" -import "go.opentelemetry.io/collector/pdata/pmetric" +import ( + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) func (dp Number) Add(in Number) Number { switch in.ValueType() { @@ -23,7 +29,46 @@ func (dp Histogram) Add(in Histogram) Histogram { panic("todo") } -// nolint func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { - panic("todo") + type H = ExpHistogram + + if dp.Scale() != in.Scale() { + hi, lo := expo.HiLo(dp, in, H.Scale) + from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale()) + expo.Downscale(hi.Positive(), from, to) + expo.Downscale(hi.Negative(), from, to) + hi.SetScale(lo.Scale()) + } + + if dp.ZeroThreshold() != in.ZeroThreshold() { + hi, lo := expo.HiLo(dp, in, H.ZeroThreshold) + expo.WidenZero(lo.DataPoint, hi.ZeroThreshold()) + } + + expo.Merge(dp.Positive(), in.Positive()) + expo.Merge(dp.Negative(), in.Negative()) + + dp.SetTimestamp(in.Timestamp()) + dp.SetCount(dp.Count() + in.Count()) + dp.SetZeroCount(dp.ZeroCount() + in.ZeroCount()) + + if dp.HasSum() && in.HasSum() { + dp.SetSum(dp.Sum() + in.Sum()) + } else { + dp.RemoveSum() + } + + if dp.HasMin() && in.HasMin() { + dp.SetMin(math.Min(dp.Min(), in.Min())) + } else { + dp.RemoveMin() + } + + if dp.HasMax() && in.HasMax() { + dp.SetMax(math.Max(dp.Max(), in.Max())) + } else { + dp.RemoveMax() + } + + return dp } diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go index 941b3cff904f..eade94eadf92 100644 --- a/processor/deltatocumulativeprocessor/internal/data/data.go +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -6,6 +6,8 @@ package data // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" ) type Point[Self any] interface { @@ -52,19 +54,19 @@ func (dp Histogram) CopyTo(dst Histogram) { } type ExpHistogram struct { - pmetric.ExponentialHistogramDataPoint + expo.DataPoint } func (dp ExpHistogram) Clone() ExpHistogram { - clone := ExpHistogram{ExponentialHistogramDataPoint: pmetric.NewExponentialHistogramDataPoint()} - if dp.ExponentialHistogramDataPoint != (pmetric.ExponentialHistogramDataPoint{}) { + clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()} + if dp.DataPoint != (expo.DataPoint{}) { dp.CopyTo(clone) } return clone } func (dp ExpHistogram) CopyTo(dst ExpHistogram) { - dp.ExponentialHistogramDataPoint.CopyTo(dst.ExponentialHistogramDataPoint) + dp.DataPoint.CopyTo(dst.DataPoint) } type mustPoint[D Point[D]] struct{ _ D } diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expo.go b/processor/deltatocumulativeprocessor/internal/data/expo/expo.go new file mode 100644 index 000000000000..2011e3cd811e --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expo.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package expo implements various operations on exponential histograms and their bucket counts +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import "go.opentelemetry.io/collector/pdata/pmetric" + +type ( + DataPoint = pmetric.ExponentialHistogramDataPoint + Buckets = pmetric.ExponentialHistogramDataPointBuckets +) + +// Abs returns a view into the buckets using an absolute scale +func Abs(bs Buckets) Absolute { + return Absolute{buckets: bs} +} + +type buckets = Buckets + +// Absolute addresses bucket counts using an absolute scale, such that it is +// interoperable with [Scale]. +// +// It spans from [[Absolute.Lower]:[Absolute.Upper]] +// +// NOTE: The zero-value is unusable, use [Abs] to construct +type Absolute struct { + buckets +} + +// Abs returns the value at absolute index 'at' +func (a Absolute) Abs(at int) uint64 { + if i, ok := a.idx(at); ok { + return a.BucketCounts().At(i) + } + return 0 +} + +// Upper returns the minimal index outside the set, such that every i < Upper +func (a Absolute) Upper() int { + return a.BucketCounts().Len() + int(a.Offset()) +} + +// Lower returns the minimal index inside the set, such that every i >= Lower +func (a Absolute) Lower() int { + return int(a.Offset()) +} + +func (a Absolute) idx(at int) (int, bool) { + idx := at - a.Lower() + return idx, idx >= 0 && idx < a.BucketCounts().Len() +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go new file mode 100644 index 000000000000..d7eb0cb2e9b3 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +func TestAbsolute(t *testing.T) { + is := expotest.Is(t) + + bs := expotest.Bins{ø, 1, 2, 3, 4, 5, ø, ø}.Into() + abs := expo.Abs(bs) + + lo, up := abs.Lower(), abs.Upper() + is.Equalf(-2, lo, "lower-bound") + is.Equalf(3, up, "upper-bound") + + for i := lo; i < up; i++ { + got := abs.Abs(i) + is.Equal(bs.BucketCounts().At(i+2), got) + } +} + +func ExampleAbsolute() { + nums := []float64{0.4, 2.3, 2.4, 4.5} + + bs := expotest.Observe0(nums...) + abs := expo.Abs(bs) + + s := expo.Scale(0) + for _, n := range nums { + fmt.Printf("%.1f belongs to bucket %+d\n", n, s.Idx(n)) + } + + fmt.Printf("\n index:") + for i := 0; i < bs.BucketCounts().Len(); i++ { + fmt.Printf(" %d", i) + } + fmt.Printf("\n abs:") + for i := abs.Lower(); i < abs.Upper(); i++ { + fmt.Printf(" %+d", i) + } + fmt.Printf("\ncounts:") + for i := abs.Lower(); i < abs.Upper(); i++ { + fmt.Printf(" %d", abs.Abs(i)) + } + + // Output: + // 0.4 belongs to bucket -2 + // 2.3 belongs to bucket +1 + // 2.4 belongs to bucket +1 + // 4.5 belongs to bucket +2 + // + // index: 0 1 2 3 4 + // abs: -2 -1 +0 +1 +2 + // counts: 1 0 0 2 1 +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go new file mode 100644 index 000000000000..13b4ce74c928 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + +import ( + "fmt" + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +const ( + Empty = math.MaxUint64 + ø = Empty +) + +// index: 0 1 2 3 4 5 6 7 +// bucket: -3 -2 -1 0 1 2 3 4 +// bounds: (0.125,0.25], (0.25,0.5], (0.5,1], (1,2], (2,4], (4,8], (8,16], (16,32] +type Bins [8]uint64 + +func (bins Bins) Into() expo.Buckets { + start := 0 + for i := 0; i < len(bins); i++ { + if bins[i] != ø { + start = i + break + } + } + + end := len(bins) + for i := start; i < len(bins); i++ { + if bins[i] == ø { + end = i + break + } + } + + counts := bins[start:end] + + buckets := pmetric.NewExponentialHistogramDataPointBuckets() + buckets.SetOffset(int32(start - 3)) + buckets.BucketCounts().FromRaw(counts) + return buckets +} + +func ObserveInto(bs expo.Buckets, scale expo.Scale, pts ...float64) { + counts := bs.BucketCounts() + + for _, pt := range pts { + pt = math.Abs(pt) + if pt <= 0.125 || pt > 32 { + panic(fmt.Sprintf("out of bounds: 0.125 < %f <= 32", pt)) + } + + idx := scale.Idx(pt) - int(bs.Offset()) + switch { + case idx < 0: + bs.SetOffset(bs.Offset() + int32(idx)) + counts.FromRaw(append(make([]uint64, -idx), counts.AsRaw()...)) + idx = 0 + case idx >= counts.Len(): + counts.Append(make([]uint64, idx-counts.Len()+1)...) + } + + counts.SetAt(idx, counts.At(idx)+1) + } +} + +func Observe(scale expo.Scale, pts ...float64) expo.Buckets { + bs := pmetric.NewExponentialHistogramDataPointBuckets() + ObserveInto(bs, scale, pts...) + return bs +} + +func Observe0(pts ...float64) expo.Buckets { + return Observe(0, pts...) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go new file mode 100644 index 000000000000..c34e7c1665bc --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + +import ( + "reflect" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +// T is the testing helper. Most notably it provides [T.Equal] +type T struct { + testing.TB +} + +func Is(t testing.TB) T { + return T{TB: t} +} + +// Equal reports whether want and got are deeply equal. +// +// Unlike [reflect.DeepEqual] it first recursively checks exported fields +// and "getters", which are defined as an exported method with: +// - exactly zero input arguments +// - exactly one return value +// - does not start with 'Append' +// +// If this yields differences, those are reported and the test fails. +// If the compared values are [pmetric.ExponentialHistogramDataPoint], then +// [pmetrictest.CompareExponentialHistogramDataPoint] is also called. +// +// If no differences are found, it falls back to [assert.Equal]. +// +// This was done to aid readability when comparing deeply nested [pmetric]/[pcommon] types, +// because in many cases [assert.Equal] output was found to be barely understandable. +func (is T) Equal(want, got any) { + is.Helper() + equal(is.TB, want, got, "") +} + +func (is T) Equalf(want, got any, name string) { + is.Helper() + equal(is.TB, want, got, name) +} + +func equal(t testing.TB, want, got any, name string) bool { + t.Helper() + require.IsType(t, want, got) + + vw := reflect.ValueOf(want) + vg := reflect.ValueOf(got) + + if vw.Kind() != reflect.Struct { + ok := reflect.DeepEqual(want, got) + if !ok { + t.Errorf("%s: %+v != %+v", name, want, got) + } + return ok + } + + ok := true + // compare all "getters" of the struct + for i := 0; i < vw.NumMethod(); i++ { + mname := vw.Type().Method(i).Name + fname := strings.TrimPrefix(name+"."+mname+"()", ".") + + mw := vw.Method(i) + mg := vg.Method(i) + + // only compare "getters" + if mw.Type().NumIn() != 0 || mw.Type().NumOut() != 1 { + continue + } + // Append(Empty) fails above heuristic, exclude it + if strings.HasPrefix(mname, "Append") { + continue + } + + rw := mw.Call(nil)[0].Interface() + rg := mg.Call(nil)[0].Interface() + + ok = equal(t, rw, rg, fname) && ok + } + + // compare all exported fields of the struct + for i := 0; i < vw.NumField(); i++ { + if !vw.Type().Field(i).IsExported() { + continue + } + fname := name + "." + vw.Type().Field(i).Name + fw := vw.Field(i).Interface() + fg := vg.Field(i).Interface() + ok = equal(t, fw, fg, fname) && ok + } + if !ok { + return false + } + + if _, ok := want.(expo.DataPoint); ok { + err := pmetrictest.CompareExponentialHistogramDataPoint(want.(expo.DataPoint), got.(expo.DataPoint)) + if err != nil { + t.Error(err) + } + } + + // fallback to a full deep-equal for rare cases (unexported fields, etc) + return assert.Equal(t, want, got) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go new file mode 100644 index 000000000000..7fb7c42b586e --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest + +import ( + "fmt" + "path/filepath" + "runtime" + "strconv" + "strings" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +var t testing.TB = fakeT{} + +var expotest = struct { + Is func(t testing.TB) T + Observe func(expo.Scale, ...float64) expo.Buckets +}{ + Is: Is, + Observe: Observe, +} + +func ExampleT_Equal() { + is := expotest.Is(t) + + want := Histogram{ + PosNeg: expotest.Observe(expo.Scale(0), 1, 2, 3, 4), + Scale: 0, + }.Into() + + got := Histogram{ + PosNeg: expotest.Observe(expo.Scale(1), 1, 1, 1, 1), + Scale: 1, + }.Into() + + is.Equal(want, got) + + // Output: + // equal_test.go:40: Negative().BucketCounts().AsRaw(): [1 1 2] != [4] + // equal_test.go:40: Negative().BucketCounts().Len(): 3 != 1 + // equal_test.go:40: Positive().BucketCounts().AsRaw(): [1 1 2] != [4] + // equal_test.go:40: Positive().BucketCounts().Len(): 3 != 1 + // equal_test.go:40: Scale(): 0 != 1 +} + +func TestNone(*testing.T) {} + +type fakeT struct { + testing.TB +} + +func (t fakeT) Helper() {} + +func (t fakeT) Errorf(format string, args ...any) { + var from string + for i := 0; ; i++ { + pc, file, line, ok := runtime.Caller(i) + if !ok { + break + } + fn := runtime.FuncForPC(pc) + if strings.HasSuffix(fn.Name(), ".ExampleT_Equal") { + from = filepath.Base(file) + ":" + strconv.Itoa(line) + break + } + } + + fmt.Printf("%s: %s\n", from, fmt.Sprintf(format, args...)) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go new file mode 100644 index 000000000000..141dad724d82 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +type Histogram struct { + Ts pcommon.Timestamp + + Pos, Neg expo.Buckets + PosNeg expo.Buckets + + Scale int + Count uint64 + Sum *float64 + + Min, Max *float64 + + Zt float64 + Zc uint64 +} + +func (hist Histogram) Into() expo.DataPoint { + dp := pmetric.NewExponentialHistogramDataPoint() + dp.SetTimestamp(hist.Ts) + + if !zero(hist.PosNeg) { + hist.PosNeg.CopyTo(dp.Positive()) + hist.PosNeg.CopyTo(dp.Negative()) + } + + if !zero(hist.Pos) { + hist.Pos.MoveTo(dp.Positive()) + } + if !zero(hist.Neg) { + hist.Neg.MoveTo(dp.Negative()) + } + + dp.SetCount(hist.Count) + if hist.Sum != nil { + dp.SetSum(*hist.Sum) + } + + if hist.Min != nil { + dp.SetMin(*hist.Min) + } + if hist.Max != nil { + dp.SetMax(*hist.Max) + } + + dp.SetScale(int32(hist.Scale)) + dp.SetZeroThreshold(hist.Zt) + dp.SetZeroCount(hist.Zc) + return dp +} + +func zero[T comparable](v T) bool { + return v == *new(T) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/merge.go b/processor/deltatocumulativeprocessor/internal/data/expo/merge.go new file mode 100644 index 000000000000..150e29a65819 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/merge.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Merge combines the counts of buckets a and b into a. +// Both buckets MUST be of same scale +func Merge(arel, brel Buckets) { + if brel.BucketCounts().Len() == 0 { + return + } + if arel.BucketCounts().Len() == 0 { + brel.CopyTo(arel) + return + } + + a, b := Abs(arel), Abs(brel) + + lo := min(a.Lower(), b.Lower()) + up := max(a.Upper(), b.Upper()) + + size := up - lo + + counts := pcommon.NewUInt64Slice() + counts.Append(make([]uint64, size-counts.Len())...) + + for i := 0; i < counts.Len(); i++ { + counts.SetAt(i, a.Abs(lo+i)+b.Abs(lo+i)) + } + + a.SetOffset(int32(lo)) + counts.MoveTo(a.BucketCounts()) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go new file mode 100644 index 000000000000..4d3791721bcd --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +const ø = expotest.Empty + +type bins = expotest.Bins + +func TestMerge(t *testing.T) { + cases := []struct { + a, b bins + want bins + }{{ + // -3 -2 -1 0 1 2 3 4 + a: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + b: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + want: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + }, { + a: bins{ø, ø, 1, 1, 1, ø, ø, ø}, + b: bins{ø, 1, 1, ø, ø, ø, ø, ø}, + want: bins{ø, 1, 2, 1, 1, ø, ø, ø}, + }, { + a: bins{ø, ø, ø, ø, 1, 1, 1, ø}, + b: bins{ø, ø, ø, ø, 1, 1, 1, ø}, + want: bins{ø, ø, ø, ø, 2, 2, 2, ø}, + }, { + a: bins{ø, 1, 1, ø, ø, ø, ø, ø}, + b: bins{ø, ø, ø, ø, 1, 1, ø, ø}, + want: bins{ø, 1, 1, 0, 1, 1, ø, ø}, + }} + + for _, cs := range cases { + a := cs.a.Into() + b := cs.b.Into() + want := cs.want.Into() + + name := fmt.Sprintf("(%+d,%d)+(%+d,%d)=(%+d,%d)", a.Offset(), a.BucketCounts().Len(), b.Offset(), b.BucketCounts().Len(), want.Offset(), want.BucketCounts().Len()) + t.Run(name, func(t *testing.T) { + expo.Merge(a, b) + is := expotest.Is(t) + is.Equal(want, a) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/ord.go b/processor/deltatocumulativeprocessor/internal/data/expo/ord.go new file mode 100644 index 000000000000..34d177be1795 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/ord.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import "cmp" + +// HiLo returns the greater of a and b by comparing the result of applying fn to +// each. If equal, returns operands as passed +func HiLo[T any, N cmp.Ordered](a, b T, fn func(T) N) (hi, lo T) { + an, bn := fn(a), fn(b) + if cmp.Less(an, bn) { + return b, a + } + return a, b +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go new file mode 100644 index 000000000000..dedc60b50f27 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/ord_test.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +func TestHiLo(t *testing.T) { + type T struct { + int int + str string + } + + a := T{int: 0, str: "foo"} + b := T{int: 1, str: "bar"} + + { + hi, lo := expo.HiLo(a, b, func(v T) int { return v.int }) + assert.Equal(t, a, lo) + assert.Equal(t, b, hi) + } + + { + hi, lo := expo.HiLo(a, b, func(v T) string { return v.str }) + assert.Equal(t, b, lo) + assert.Equal(t, a, hi) + } + + { + hi, lo := expo.HiLo(a, b, func(T) int { return 0 }) + assert.Equal(t, a, hi) + assert.Equal(t, b, lo) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/scale.go b/processor/deltatocumulativeprocessor/internal/data/expo/scale.go new file mode 100644 index 000000000000..ac075158dc3c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/scale.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "fmt" + "math" +) + +type Scale int32 + +// Idx gives the bucket index v belongs into +func (scale Scale) Idx(v float64) int { + // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function + + // Special case for power-of-two values. + if frac, exp := math.Frexp(v); frac == 0.5 { + return ((exp - 1) << scale) - 1 + } + + scaleFactor := math.Ldexp(math.Log2E, int(scale)) + // Note: math.Floor(value) equals math.Ceil(value)-1 when value + // is not a power of two, which is checked above. + return int(math.Floor(math.Log(v) * scaleFactor)) +} + +// Bounds returns the half-open interval (min,max] of the bucket at index. +// This means a value min < v <= max belongs to this bucket. +// +// NOTE: this is different from Go slice intervals, which are [a,b) +func (scale Scale) Bounds(index int) (min, max float64) { + // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function + lower := func(index int) float64 { + inverseFactor := math.Ldexp(math.Ln2, int(-scale)) + return math.Exp(float64(index) * inverseFactor) + } + + return lower(index), lower(index + 1) +} + +// Downscale collapses the buckets of bs until scale 'to' is reached +func Downscale(bs Buckets, from, to Scale) { + switch { + case from == to: + return + case from < to: + // because even distribution within the buckets cannot be assumed, it is + // not possible to correctly upscale (split) buckets. + // any attempt to do so would yield erronous data. + panic(fmt.Sprintf("cannot upscale without introducing error (%d -> %d)", from, to)) + } + + for at := from; at > to; at-- { + Collapse(bs) + } +} + +// Collapse merges adjacent buckets and zeros the remaining area: +// +// before: 1 1 1 1 1 1 1 1 1 1 1 1 +// after: 2 2 2 2 2 2 0 0 0 0 0 0 +// +// Due to the "perfect subsetting" property of exponential histograms, this +// gives the same observation as before, but recorded at scale-1. See +// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponential-scale. +// +// Because every bucket now spans twice as much range, half of the allocated +// counts slice is technically no longer required. It is zeroed but left in +// place to avoid future allocations, because observations may happen in that +// area at a later time. +func Collapse(bs Buckets) { + counts := bs.BucketCounts() + size := counts.Len() / 2 + if counts.Len()%2 != 0 { + size++ + } + + // merging needs to happen in pairs aligned to i=0. if offset is non-even, + // we need to shift the whole merging by one to make above condition true. + shift := 0 + if bs.Offset()%2 != 0 { + bs.SetOffset(bs.Offset() - 1) + shift-- + } + bs.SetOffset(bs.Offset() / 2) + + for i := 0; i < size; i++ { + // size is ~half of len. we add two buckets per iteration. + // k jumps in steps of 2, shifted if offset makes this necessary. + k := i*2 + shift + + // special case: we just started and had to shift. the left half of the + // new bucket is not actually stored, so only use counts[0]. + if i == 0 && k == -1 { + counts.SetAt(i, counts.At(k+1)) + continue + } + + // new[k] = old[k]+old[k+1] + counts.SetAt(i, counts.At(k)) + if k+1 < counts.Len() { + counts.SetAt(i, counts.At(k)+counts.At(k+1)) + } + } + + // zero the excess area. its not needed to represent the observation + // anymore, but kept for two reasons: + // 1. future observations may need it, no need to re-alloc then if kept + // 2. [pcommon.Uint64Slice] can not, in fact, be sliced, so getting rid + // of it would alloc ¯\_(ツ)_/¯ + for i := size; i < counts.Len(); i++ { + counts.SetAt(i, 0) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go new file mode 100644 index 000000000000..ceb76eb1d44d --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +func TestDownscale(t *testing.T) { + type Repr[T any] struct { + scale expo.Scale + bkt T + } + + cases := [][]Repr[string]{{ + {scale: 2, bkt: "1 1 1 1 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " 2 2 2 2 2 2 "}, + {scale: 0, bkt: " 4 4 4 "}, + }, { + {scale: 2, bkt: "ø 1 1 1 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " 1 2 2 2 2 2 "}, + {scale: 0, bkt: " 3 4 4 "}, + }, { + {scale: 2, bkt: "ø ø 1 1 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " ø 2 2 2 2 2 "}, + {scale: 0, bkt: " 2 4 4 "}, + }, { + {scale: 2, bkt: "ø ø ø ø 1 1 1 1 1 1 1 1"}, + {scale: 1, bkt: " ø ø 2 2 2 2 "}, + {scale: 0, bkt: " ø 4 4 "}, + }, { + {scale: 2, bkt: "1 1 1 1 1 1 1 1 1 "}, + {scale: 1, bkt: " 2 2 2 2 1 "}, + {scale: 0, bkt: " 4 4 1 "}, + }, { + {scale: 2, bkt: "1 1 1 1 1 1 1 1 1 1 1 1"}, + {scale: 0, bkt: " 4 4 4 "}, + }} + + type B = expo.Buckets + for i, reprs := range cases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + buckets := make([]Repr[B], len(reprs)) + for i, r := range reprs { + bkt := pmetric.NewExponentialHistogramDataPointBuckets() + for _, elem := range strings.Fields(r.bkt) { + if elem == "ø" { + bkt.SetOffset(bkt.Offset() + 1) + continue + } + n, err := strconv.Atoi(elem) + if err != nil { + panic(err) + } + bkt.BucketCounts().Append(uint64(n)) + } + buckets[i] = Repr[B]{scale: r.scale, bkt: bkt} + } + + is := expotest.Is(t) + for i := 0; i < len(buckets)-1; i++ { + expo.Downscale(buckets[i].bkt, buckets[i].scale, buckets[i+1].scale) + + is.Equalf(buckets[i+1].bkt.Offset(), buckets[i].bkt.Offset(), "offset") + + want := buckets[i+1].bkt.BucketCounts().AsRaw() + got := buckets[i].bkt.BucketCounts().AsRaw() + + is.Equalf(want, got[:len(want)], "counts") + is.Equalf(make([]uint64, len(got)-len(want)), got[len(want):], "extra-space") + } + }) + } + + t.Run("panics", func(t *testing.T) { + assert.PanicsWithValue(t, "cannot upscale without introducing error (8 -> 12)", func() { + expo.Downscale(bins{}.Into(), 8, 12) + }) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/zero.go b/processor/deltatocumulativeprocessor/internal/data/expo/zero.go new file mode 100644 index 000000000000..2d5401b39f5c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/zero.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "cmp" + "fmt" +) + +// WidenZero widens the zero-bucket to span at least [-width,width], possibly wider +// if min falls in the middle of a bucket. +// +// Both buckets counts MUST be of same scale. +func WidenZero(dp DataPoint, width float64) { + switch { + case width == dp.ZeroThreshold(): + return + case width < dp.ZeroThreshold(): + panic(fmt.Sprintf("min must be larger than current threshold (%f)", dp.ZeroThreshold())) + } + + scale := Scale(dp.Scale()) + zero := scale.Idx(width) // the largest bucket index inside the zero width + + widen := func(bs Buckets) { + abs := Abs(bs) + for i := abs.Lower(); i <= zero; i++ { + dp.SetZeroCount(dp.ZeroCount() + abs.Abs(i)) + } + + // right next to the new zero bucket, constrained to slice range + lo := clamp(zero+1, abs.Lower(), abs.Upper()) + abs.Slice(lo, abs.Upper()) + } + + widen(dp.Positive()) + widen(dp.Negative()) + + _, max := scale.Bounds(zero) + dp.SetZeroThreshold(max) +} + +// Slice drops data outside the range from <= i < to from the bucket counts. It behaves the same as Go's [a:b] +// +// Limitations: +// - due to a limitation of the pcommon package, slicing cannot happen in-place and allocates +// - in consequence, data outside the range is garbage collected +func (a Absolute) Slice(from, to int) { + lo, up := a.Lower(), a.Upper() + switch { + case from > to: + panic(fmt.Sprintf("bad bounds: must be from<=to (got %d<=%d)", from, to)) + case from < lo || to > up: + panic(fmt.Sprintf("%d:%d is out of bounds for %d:%d", from, to, lo, up)) + } + + first := from - lo + last := to - lo + + a.BucketCounts().FromRaw(a.BucketCounts().AsRaw()[first:last]) + a.SetOffset(int32(from)) +} + +// clamp constraints v to the range up..=lo +func clamp[N cmp.Ordered](v, lo, up N) N { + return max(lo, min(v, up)) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go new file mode 100644 index 000000000000..92e9d88a38d1 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +type hist = expotest.Histogram + +func TestWidenZero(t *testing.T) { + cases := []struct { + name string + hist hist + want hist + min float64 + }{{ + // -3 -2 -1 0 1 2 3 4 + // (0.125,0.25], (0.25,0.5], (0.5,1], (1,2], (2,4], (4,8], (8,16], (16,32] + // + // -3 -2 -1 0 1 2 3 4 + hist: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, ø, ø}.Into(), Zt: 0, Zc: 0}, + want: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, ø, ø}.Into(), Zt: 0, Zc: 0}, + }, { + // zt=2 is upper boundary of bucket 0. keep buckets [1:n] + hist: hist{PosNeg: bins{ø, ø, 1, 2, 3, 4, 5, ø}.Into(), Zt: 0, Zc: 2}, + want: hist{PosNeg: bins{ø, ø, ø, ø, 3, 4, 5, ø}.Into(), Zt: 2, Zc: 2 + 2*(1+2)}, + }, { + // zt=3 is within bucket 1. keep buckets [2:n] + // set zt=4 because it must cover full buckets + hist: hist{PosNeg: bins{ø, ø, 1, 2, 3, 4, 5, ø}.Into(), Zt: 0, Zc: 2}, + min: 3, + want: hist{PosNeg: bins{ø, ø, ø, ø, ø, 4, 5, ø}.Into(), Zt: 4, Zc: 2 + 2*(1+2+3)}, + }, { + // zt=2 is higher, but no change expected as no buckets in this range are populated + hist: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, 1, 1}.Into(), Zt: 1.0, Zc: 2}, + want: hist{PosNeg: bins{ø, ø, ø, ø, ø, ø, 1, 1}.Into(), Zt: 2.0, Zc: 2}, + }} + + for _, cs := range cases { + name := fmt.Sprintf("%.2f->%.2f", cs.hist.Zt, cs.want.Zt) + t.Run(name, func(t *testing.T) { + hist := cs.hist.Into() + want := cs.want.Into() + + zt := cs.min + if zt == 0 { + zt = want.ZeroThreshold() + } + expo.WidenZero(hist, zt) + + is := expotest.Is(t) + is.Equal(want, hist) + }) + } + + t.Run("panics", func(t *testing.T) { + assert.PanicsWithValue(t, "min must be larger than current threshold (1.500000)", func() { + hist := hist{Zt: 1.5}.Into() + expo.WidenZero(hist, 0.5) + }) + }) +} + +func TestSlice(t *testing.T) { + cases := []struct { + bins bins + want bins + }{{ + // -3 -2 -1 0 1 2 3 4 + bins: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + want: bins{ø, ø, ø, ø, ø, ø, ø, ø}, + }, { + bins: bins{1, 2, 3, 4, 5, 6, 7, 8}, + want: bins{1, 2, 3, 4, 5, 6, 7, 8}, + }, { + bins: bins{ø, 2, 3, 4, 5, 6, 7, ø}, + want: bins{ø, ø, 3, 4, 5, ø, ø, ø}, + }} + + for _, cs := range cases { + from, to := 0, len(cs.want) + for i := 0; i < len(cs.want); i++ { + if cs.want[i] != ø { + from += i + break + } + } + for i := from; i < len(cs.want); i++ { + if cs.want[i] == ø { + to = i + break + } + } + from -= 3 + to -= 3 + + t.Run(fmt.Sprintf("[%d:%d]", from, to), func(t *testing.T) { + bins := cs.bins.Into() + want := cs.want.Into() + + expo.Abs(bins).Slice(from, to) + + is := expotest.Is(t) + is.Equal(want, bins) + }) + } + + t.Run("panics", func(t *testing.T) { + data := expo.Abs(bins{1, 2, 3, 4, 5, 6, 7, 8}.Into()) + assert.PanicsWithValue(t, "bad bounds: must be from<=to (got 8<=4)", func() { + data.Slice(8, 4) + }) + assert.PanicsWithValue(t, "-6:12 is out of bounds for -3:5", func() { + data.Slice(-6, 12) + }) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo_test.go b/processor/deltatocumulativeprocessor/internal/data/expo_test.go new file mode 100644 index 000000000000..b910b409cb55 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/expo_test.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data + +import ( + "math" + "testing" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +) + +// represents none/absent/unset in several tests +const ø = math.MaxUint64 + +func TestAdd(t *testing.T) { + type expdp = expotest.Histogram + type bins = expotest.Bins + var obs0 = expotest.Observe0 + + cases := []struct { + name string + dp, in expdp + want expdp + flip bool + }{{ + name: "noop", + dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + in: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + want: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + }, { + name: "simple", + dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, + in: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8)}, + want: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (0 + (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8))}, + }, { + name: "lower+shorter", + dp: expdp{PosNeg: bins{ø, ø, ø, ø, ø, 1, 1, 1}.Into(), Count: 2 * 3}, + in: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 1, 1, ø}.Into(), Count: 2 * 5}, + want: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 2, 2, 1}.Into(), Count: 2 * (3 + 5)}, + }, { + name: "longer", + dp: expdp{PosNeg: bins{1, 1, 1, 1, 1, ø, ø, ø}.Into(), Count: 2 * 5}, + in: expdp{PosNeg: bins{1, 1, 1, 1, 1, 1, 1, 1}.Into(), Count: 2 * 8}, + want: expdp{PosNeg: bins{2, 2, 2, 2, 2, 1, 1, 1}.Into(), Count: 2 * (5 + 8)}, + }, { + name: "optional/missing", flip: true, + dp: expdp{PosNeg: obs0(0.6, 2.4) /* */, Count: 2}, + in: expdp{PosNeg: obs0(1.5, 3.2, 6.3), Min: some(1.5), Max: some(6.3), Sum: some(11.0), Count: 3}, + want: expdp{PosNeg: obs0(0.6, 2.4, 1.5, 3.2, 6.3) /* */, Count: 5}, + }, { + name: "optional/min-max-sum", + dp: expdp{PosNeg: obs0(1.5, 5.3, 11.6) /* */, Min: some(1.5), Max: some(11.6), Sum: some(18.4), Count: 3}, + in: expdp{PosNeg: obs0(0.6, 3.3, 7.9) /* */, Min: some(0.6), Max: some(07.9), Sum: some(11.8), Count: 3}, + want: expdp{PosNeg: obs0(1.5, 5.3, 11.6, 0.6, 3.3, 7.9), Min: some(0.6), Max: some(11.6), Sum: some(30.2), Count: 6}, + }, { + name: "zero/count", + dp: expdp{PosNeg: bins{0, 1, 2}.Into(), Zt: 0, Zc: 3, Count: 5}, + in: expdp{PosNeg: bins{0, 1, 0}.Into(), Zt: 0, Zc: 2, Count: 3}, + want: expdp{PosNeg: bins{0, 2, 2}.Into(), Zt: 0, Zc: 5, Count: 8}, + }, { + name: "zero/diff", + dp: expdp{PosNeg: bins{ø, ø, 0, 1, 1, 1}.Into(), Zt: 0.0, Zc: 2}, + in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 1}.Into(), Zt: 2.0, Zc: 2}, + want: expdp{PosNeg: bins{ø, ø, ø, ø, 2, 2}.Into(), Zt: 2.0, Zc: 4 + 2*1}, + }, { + name: "zero/subzero", + dp: expdp{PosNeg: bins{ø, 1, 1, 1, 1, 1}.Into(), Zt: 0.2, Zc: 2}, + in: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 1}.Into(), Zt: 0.3, Zc: 2}, + want: expdp{PosNeg: bins{ø, ø, 2, 2, 2, 2}.Into(), Zt: 0.5, Zc: 4 + 2*1}, + }, { + name: "negative-offset", + dp: expdp{PosNeg: rawbs([]uint64{ /* */ 1, 2}, -2)}, + in: expdp{PosNeg: rawbs([]uint64{1, 2, 3 /* */}, -5)}, + want: expdp{PosNeg: rawbs([]uint64{1, 2, 3, 1, 2}, -5)}, + }, { + name: "scale/diff", + dp: expdp{PosNeg: expotest.Observe(expo.Scale(1), 1, 2, 3, 4), Scale: 1}, + in: expdp{PosNeg: expotest.Observe(expo.Scale(0), 4, 3, 2, 1), Scale: 0}, + want: expdp{Scale: 0, PosNeg: func() expo.Buckets { + bs := pmetric.NewExponentialHistogramDataPointBuckets() + expotest.ObserveInto(bs, expo.Scale(0), 1, 2, 3, 4) + expotest.ObserveInto(bs, expo.Scale(0), 4, 3, 2, 1) + bs.BucketCounts().Append([]uint64{0, 0}...) // rescaling leaves zeroed memory. this is expected + return bs + }()}, + }} + + for _, cs := range cases { + run := func(dp, in expdp) func(t *testing.T) { + return func(t *testing.T) { + is := expotest.Is(t) + + var ( + dp = ExpHistogram{dp.Into()} + in = ExpHistogram{in.Into()} + want = ExpHistogram{cs.want.Into()} + ) + + dp.SetTimestamp(0) + in.SetTimestamp(1) + want.SetTimestamp(1) + + got := dp.Add(in) + is.Equal(want.DataPoint, got.DataPoint) + } + } + + if cs.flip { + t.Run(cs.name+"-dp", run(cs.dp, cs.in)) + t.Run(cs.name+"-in", run(cs.in, cs.dp)) + continue + } + t.Run(cs.name, run(cs.dp, cs.in)) + } + +} + +func rawbs(data []uint64, offset int32) expo.Buckets { + bs := pmetric.NewExponentialHistogramDataPointBuckets() + bs.BucketCounts().FromRaw(data) + bs.SetOffset(offset) + return bs +} + +func some[T any](v T) *T { + return &v +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index c305c85d781e..f063475055f7 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -47,7 +47,7 @@ type ExpHistogram Metric func (s ExpHistogram) At(i int) data.ExpHistogram { dp := Metric(s).ExponentialHistogram().DataPoints().At(i) - return data.ExpHistogram{ExponentialHistogramDataPoint: dp} + return data.ExpHistogram{DataPoint: dp} } func (s ExpHistogram) Len() int { diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index f3b88ef8b96a..946ffd98d1d6 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -19,11 +19,14 @@ import ( type Telemetry struct { Metrics + + meter metric.Meter } func New(meter metric.Meter) Telemetry { return Telemetry{ Metrics: metrics(meter), + meter: meter, } } @@ -89,23 +92,23 @@ func metrics(meter metric.Meter) Metrics { } } -func (m Metrics) WithLimit(meter metric.Meter, max int64) { +func (tel Telemetry) WithLimit(max int64) { then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(m.streams.limit, max) + o.ObserveInt64(tel.streams.limit, max) return nil }) - _, err := meter.RegisterCallback(then, m.streams.limit) + _, err := tel.meter.RegisterCallback(then, tel.streams.limit) if err != nil { panic(err) } } -func (m Metrics) WithStale(meter metric.Meter, max time.Duration) { +func (tel Telemetry) WithStale(max time.Duration) { then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(m.streams.stale, int64(max.Seconds())) + o.ObserveInt64(tel.streams.stale, int64(max.Seconds())) return nil }) - _, err := meter.RegisterCallback(then, m.streams.stale) + _, err := tel.meter.RegisterCallback(then, tel.streams.stale) if err != nil { panic(err) } diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 59fe2c7c4c0c..01e1cef4f916 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -34,8 +34,8 @@ type Processor struct { ctx context.Context cancel context.CancelFunc - aggr streams.Aggregator[data.Number] - stale maybe.Ptr[staleness.Staleness[data.Number]] + sums Pipeline[data.Number] + expo Pipeline[data.ExpHistogram] mtx sync.Mutex } @@ -43,29 +43,43 @@ type Processor struct { func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) + tel := telemetry.New(meter) + proc := Processor{ log: log, ctx: ctx, cancel: cancel, next: next, + + sums: pipeline[data.Number](cfg, &tel), + expo: pipeline[data.ExpHistogram](cfg, &tel), } - tel := telemetry.New(meter) + return &proc +} + +type Pipeline[D data.Point[D]] struct { + aggr streams.Aggregator[D] + stale maybe.Ptr[staleness.Staleness[D]] +} + +func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D] { + var pipe Pipeline[D] - var dps streams.Map[data.Number] - dps = delta.New[data.Number]() + var dps streams.Map[D] + dps = delta.New[D]() dps = telemetry.ObserveItems(dps, &tel.Metrics) if cfg.MaxStale > 0 { - tel.WithStale(meter, cfg.MaxStale) + tel.WithStale(cfg.MaxStale) stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) - proc.stale = stale + pipe.stale = stale dps, _ = stale.Try() } if cfg.MaxStreams > 0 { - tel.WithLimit(meter, int64(cfg.MaxStreams)) + tel.WithLimit(int64(cfg.MaxStreams)) lim := streams.Limit(dps, cfg.MaxStreams) - if stale, ok := proc.stale.Try(); ok { + if stale, ok := pipe.stale.Try(); ok { lim.Evictor = stale } dps = lim @@ -73,13 +87,14 @@ func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consume dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) - proc.aggr = streams.IntoAggregator(dps) - return &proc + pipe.aggr = streams.IntoAggregator(dps) + return pipe } func (p *Processor) Start(_ context.Context, _ component.Host) error { - stale, ok := p.stale.Try() - if !ok { + sums, sok := p.sums.stale.Try() + expo, eok := p.expo.stale.Try() + if !(sok && eok) { return nil } @@ -91,7 +106,8 @@ func (p *Processor) Start(_ context.Context, _ component.Host) error { return case <-tick.C: p.mtx.Lock() - stale.ExpireOldEntries() + sums.ExpireOldEntries() + expo.ExpireOldEntries() p.mtx.Unlock() } } @@ -109,27 +125,34 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if err := context.Cause(p.ctx); err != nil { + return err + } + p.mtx.Lock() defer p.mtx.Unlock() var errs error - metrics.Each(md, func(m metrics.Metric) { switch m.Type() { case pmetric.MetricTypeSum: sum := m.Sum() if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Aggregate[data.Number](metrics.Sum(m), p.aggr) + err := streams.Aggregate(metrics.Sum(m), p.sums.aggr) errs = errors.Join(errs, err) sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } case pmetric.MetricTypeHistogram: // TODO case pmetric.MetricTypeExponentialHistogram: - // TODO + expo := m.ExponentialHistogram() + if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Aggregate(metrics.ExpHistogram(m), p.expo.aggr) + errs = errors.Join(errs, err) + expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } } }) - if errs != nil { return errs } From 90935cec1e3cd64f263433f8b42b465f585b333b Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 14 May 2024 15:44:31 +0300 Subject: [PATCH 002/258] [pkg/stanza] Add container operator parser (#32594) **Description:** This PR implements the new container logs parser as it was proposed at https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31959. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31959 **Testing:** Added unit tests. Providing manual testing steps as well: ### How to test this manually 1. Using the following config file: ```yaml receivers: filelog: start_at: end include_file_name: false include_file_path: true include: - /var/log/pods/*/*/*.log operators: - id: container-parser type: container output: m1 - type: move id: m1 from: attributes.k8s.pod.name to: attributes.val - id: some type: add field: attributes.key2.key_in value: val2 exporters: debug: verbosity: detailed service: pipelines: logs: receivers: [filelog] exporters: [debug] processors: [] ``` 2. Start the collector: `./bin/otelcontribcol_linux_amd64 --config ~/otelcol/container_parser/config.yaml` 3. Use the following bash script to create some logs: ```bash #! /bin/bash echo '2024-04-13T07:59:37.505201169-05:00 stdout P This is a very very long crio line th' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler43/1.log echo '{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}' >> /var/log/pods/kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log echo '2024-04-13T07:59:37.505201169-05:00 stdout F at is awesome! crio is awesome!' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler43/1.log echo '2021-06-22T10:27:25.813799277Z stdout P some containerd log th' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log echo '{"log":"INFO: another log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}' >> /var/log/pods/kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log echo '2021-06-22T10:27:25.813799277Z stdout F at is super awesome! Containerd is awesome' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log echo '2024-04-13T07:59:37.505201169-05:00 stdout F standalone crio line which is awesome!' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler43/1.log echo '2021-06-22T10:27:25.813799277Z stdout F standalone containerd line that is super awesome!' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log ``` 4. Run the above as a bash script to verify any parallel processing. Verify that the output is correct. ### Test manually on k8s 1. `make docker-otelcontribcol && docker tag otelcontribcol otelcontribcol-dev:0.0.1 && kind load docker-image otelcontribcol-dev:0.0.1` 2. Install using the following helm values file: ```yaml mode: daemonset presets: logsCollection: enabled: true image: repository: otelcontribcol-dev tag: "0.0.1" pullPolicy: IfNotPresent command: name: otelcontribcol config: exporters: debug: verbosity: detailed receivers: filelog: start_at: end include_file_name: false include_file_path: true exclude: - /var/log/pods/default_daemonset-opentelemetry-collector*_*/opentelemetry-collector/*.log include: - /var/log/pods/*/*/*.log operators: - id: container-parser type: container output: some - id: some type: add field: attributes.key2.key_in value: val2 service: pipelines: logs: receivers: [filelog] processors: [batch] exporters: [debug] ``` 3. Check collector's output to verify the logs are parsed properly: ```console 2024-05-10T07:52:02.307Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 2} 2024-05-10T07:52:02.307Z info ResourceLog #0 Resource SchemaURL: ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope LogRecord #0 ObservedTimestamp: 2024-05-10 07:52:02.046236071 +0000 UTC Timestamp: 2024-05-10 07:52:01.92533954 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Str(otel logs at 07:52:01) Attributes: -> log: Map({"iostream":"stdout"}) -> time: Str(2024-05-10T07:52:01.92533954Z) -> k8s: Map({"container":{"name":"busybox","restart_count":"0"},"namespace":{"name":"default"},"pod":{"name":"daemonset-logs-6f6mn","uid":"1069e46b-03b2-4532-a71f-aaec06c0197b"}}) -> logtag: Str(F) -> key2: Map({"key_in":"val2"}) -> log.file.path: Str(/var/log/pods/default_daemonset-logs-6f6mn_1069e46b-03b2-4532-a71f-aaec06c0197b/busybox/0.log) Trace ID: Span ID: Flags: 0 LogRecord #1 ObservedTimestamp: 2024-05-10 07:52:02.046411602 +0000 UTC Timestamp: 2024-05-10 07:52:02.027386192 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Str(otel logs at 07:52:02) Attributes: -> log.file.path: Str(/var/log/pods/default_daemonset-logs-6f6mn_1069e46b-03b2-4532-a71f-aaec06c0197b/busybox/0.log) -> time: Str(2024-05-10T07:52:02.027386192Z) -> log: Map({"iostream":"stdout"}) -> logtag: Str(F) -> k8s: Map({"container":{"name":"busybox","restart_count":"0"},"namespace":{"name":"default"},"pod":{"name":"daemonset-logs-6f6mn","uid":"1069e46b-03b2-4532-a71f-aaec06c0197b"}}) -> key2: Map({"key_in":"val2"}) Trace ID: Span ID: Flags: 0 ... ``` **Documentation:** Added Signed-off-by: ChrsMark --- .chloggen/add_container_parser.yaml | 27 ++ pkg/stanza/adapter/register.go | 1 + pkg/stanza/docs/operators/container.md | 238 +++++++++++ pkg/stanza/operator/helper/regexp.go | 28 ++ .../operator/parser/container/config.go | 120 ++++++ .../operator/parser/container/config_test.go | 107 +++++ .../operator/parser/container/package_test.go | 14 + .../operator/parser/container/parser.go | 357 +++++++++++++++++ .../operator/parser/container/parser_test.go | 370 ++++++++++++++++++ .../parser/container/testdata/config.yaml | 41 ++ pkg/stanza/operator/parser/regex/parser.go | 17 +- 11 files changed, 1306 insertions(+), 14 deletions(-) create mode 100644 .chloggen/add_container_parser.yaml create mode 100644 pkg/stanza/docs/operators/container.md create mode 100644 pkg/stanza/operator/helper/regexp.go create mode 100644 pkg/stanza/operator/parser/container/config.go create mode 100644 pkg/stanza/operator/parser/container/config_test.go create mode 100644 pkg/stanza/operator/parser/container/package_test.go create mode 100644 pkg/stanza/operator/parser/container/parser.go create mode 100644 pkg/stanza/operator/parser/container/parser_test.go create mode 100644 pkg/stanza/operator/parser/container/testdata/config.yaml diff --git a/.chloggen/add_container_parser.yaml b/.chloggen/add_container_parser.yaml new file mode 100644 index 000000000000..b6b4406b8f43 --- /dev/null +++ b/.chloggen/add_container_parser.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add container operator parser + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31959] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/adapter/register.go b/pkg/stanza/adapter/register.go index 8105ef17d587..426e456decfa 100644 --- a/pkg/stanza/adapter/register.go +++ b/pkg/stanza/adapter/register.go @@ -6,6 +6,7 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con import ( _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/file" // Register parsers and transformers for stanza-based log receivers _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" + _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/csv" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/json" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/jsonarray" diff --git a/pkg/stanza/docs/operators/container.md b/pkg/stanza/docs/operators/container.md new file mode 100644 index 000000000000..4cc972fbc5ed --- /dev/null +++ b/pkg/stanza/docs/operators/container.md @@ -0,0 +1,238 @@ +## `container` operator + +The `container` operator parses logs in `docker`, `cri-o` and `containerd` formats. + +### Configuration Fields + +| Field | Default | Description | +|------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `id` | `container` | A unique identifier for the operator. | +| `format` | `` | The container log format to use if it is known. Users can choose between `docker`, `crio` and `containerd`. If not set, the format will be automatically detected. | +| `add_metadata_from_filepath` | `true` | Set if k8s metadata should be added from the file path. Requires the `log.file.path` field to be present. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. | +| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | +| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | +| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. | + + +### Embedded Operations + +The `container` parser can be configured to embed certain operations such as the severity parsing. For more information, see [complex parsers](../types/parsers.md#complex-parsers). + +### Add metadata from file path + +Requires `include_file_path: true` in order for the `log.file.path` field to be available for the operator. +If that's not possible, users can disable the metadata addition with `add_metadata_from_filepath: false`. +A file path like `"/var/log/pods/some-ns_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"`, +will produce the following k8s metadata: + +```json +{ + "attributes": { + "k8s": { + "container": { + "name": "kube-controller", + "restart_count": "1" + }, "pod": { + "uid": "49cc7c1fd3702c40b2686ea7486091d6", + "name": "kube-controller-kind-control-plane" + }, "namespace": { + "name": "some-ns" + } + } + } +} +``` + +### Example Configurations: + +#### Parse the body as docker container log + +Configuration: +```yaml +- type: container + format: docker + add_metadata_from_filepath: true +``` + +Note: in this example the `format: docker` is optional since formats can be automatically detected as well. + `add_metadata_from_filepath` is true by default as well. + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "{\"log\":\"INFO: log line here\",\"stream\":\"stdout\",\"time\":\"2029-03-30T08:31:20.545192187Z\"}", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2024-03-30 08:31:20.545192187 +0000 UTC", + "body": "log line here", + "attributes": { + "time": "2024-03-30T08:31:20.545192187Z", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
+ +#### Parse the body as cri-o container log + +Configuration: +```yaml +- type: container +``` + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "2024-04-13T07:59:37.505201169-05:00 stdout F standalone crio line which is awesome", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2024-04-13 12:59:37.505201169 +0000 UTC", + "body": "standalone crio line which is awesome", + "attributes": { + "time": "2024-04-13T07:59:37.505201169-05:00", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
+ +#### Parse the body as containerd container log + +Configuration: +```yaml +- type: container +``` + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout F standalone containerd line that is super awesome", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC", + "body": "standalone containerd line that is super awesome", + "attributes": { + "time": "2023-06-22T10:27:25.813799277Z", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
+ +#### Parse the multiline as containerd container log and recombine into a single one + +Configuration: +```yaml +- type: container +``` + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout P multiline containerd line that i", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +}, +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout F s super awesomne", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC", + "body": "multiline containerd line that is super awesome", + "attributes": { + "time": "2023-06-22T10:27:25.813799277Z", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
\ No newline at end of file diff --git a/pkg/stanza/operator/helper/regexp.go b/pkg/stanza/operator/helper/regexp.go new file mode 100644 index 000000000000..7306926ced79 --- /dev/null +++ b/pkg/stanza/operator/helper/regexp.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + +import ( + "fmt" + "regexp" +) + +func MatchValues(value string, regexp *regexp.Regexp) (map[string]any, error) { + matches := regexp.FindStringSubmatch(value) + if matches == nil { + return nil, fmt.Errorf("regex pattern does not match") + } + + parsedValues := map[string]any{} + for i, subexp := range regexp.SubexpNames() { + if i == 0 { + // Skip whole match + continue + } + if subexp != "" { + parsedValues[subexp] = matches[i] + } + } + return parsedValues, nil +} diff --git a/pkg/stanza/operator/parser/container/config.go b/pkg/stanza/operator/parser/container/config.go new file mode 100644 index 000000000000..fb6555708182 --- /dev/null +++ b/pkg/stanza/operator/parser/container/config.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container" + +import ( + "fmt" + "sync" + + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine" +) + +const operatorType = "container" + +func init() { + operator.Register(operatorType, func() operator.Builder { return NewConfig() }) +} + +// NewConfig creates a new JSON parser config with default values +func NewConfig() *Config { + return NewConfigWithID(operatorType) +} + +// NewConfigWithID creates a new JSON parser config with default values +func NewConfigWithID(operatorID string) *Config { + return &Config{ + ParserConfig: helper.NewParserConfig(operatorID, operatorType), + Format: "", + AddMetadataFromFilePath: true, + } +} + +// Config is the configuration of a Container parser operator. +type Config struct { + helper.ParserConfig `mapstructure:",squash"` + + Format string `mapstructure:"format"` + AddMetadataFromFilePath bool `mapstructure:"add_metadata_from_filepath"` +} + +// Build will build a Container parser operator. +func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) { + parserOperator, err := c.ParserConfig.Build(set) + if err != nil { + return nil, err + } + + cLogEmitter := helper.NewLogEmitter(set.Logger.Sugar()) + recombineParser, err := createRecombine(set, cLogEmitter) + if err != nil { + return nil, fmt.Errorf("failed to create internal recombine config: %w", err) + } + + wg := sync.WaitGroup{} + + if c.Format != "" { + switch c.Format { + case dockerFormat, crioFormat, containerdFormat: + default: + return &Parser{}, errors.NewError( + "operator config has an invalid `format` field.", + "ensure that the `format` field is set to one of `docker`, `crio`, `containerd`.", + "format", c.OnError, + ) + } + } + + p := &Parser{ + ParserOperator: parserOperator, + recombineParser: recombineParser, + json: jsoniter.ConfigFastest, + format: c.Format, + addMetadataFromFilepath: c.AddMetadataFromFilePath, + crioLogEmitter: cLogEmitter, + criConsumers: &wg, + } + return p, nil +} + +// createRecombine creates an internal recombine operator which outputs to an async helper.LogEmitter +// the equivalent recombine config: +// +// combine_field: body +// combine_with: "" +// is_last_entry: attributes.logtag == 'F' +// max_log_size: 102400 +// source_identifier: attributes["log.file.path"] +// type: recombine +func createRecombine(set component.TelemetrySettings, cLogEmitter *helper.LogEmitter) (operator.Operator, error) { + recombineParserCfg := createRecombineConfig() + recombineParser, err := recombineParserCfg.Build(set) + if err != nil { + return nil, fmt.Errorf("failed to resolve internal recombine config: %w", err) + } + + // set the LogEmmiter as the output of the recombine parser + recombineParser.SetOutputIDs([]string{cLogEmitter.OperatorID}) + if err := recombineParser.SetOutputs([]operator.Operator{cLogEmitter}); err != nil { + return nil, fmt.Errorf("failed to set outputs of internal recombine") + } + + return recombineParser, nil +} + +func createRecombineConfig() *recombine.Config { + recombineParserCfg := recombine.NewConfigWithID(recombineInternalID) + recombineParserCfg.IsLastEntry = "attributes.logtag == 'F'" + recombineParserCfg.CombineField = entry.NewBodyField() + recombineParserCfg.CombineWith = "" + recombineParserCfg.SourceIdentifier = entry.NewAttributeField("log.file.path") + recombineParserCfg.MaxLogSize = 102400 + return recombineParserCfg +} diff --git a/pkg/stanza/operator/parser/container/config_test.go b/pkg/stanza/operator/parser/container/config_test.go new file mode 100644 index 000000000000..599c26c1b7fd --- /dev/null +++ b/pkg/stanza/operator/parser/container/config_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package container + +import ( + "path/filepath" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" +) + +func TestConfig(t *testing.T) { + operatortest.ConfigUnmarshalTests{ + DefaultConfig: NewConfig(), + TestsFile: filepath.Join(".", "testdata", "config.yaml"), + Tests: []operatortest.ConfigUnmarshalTest{ + { + Name: "default", + Expect: NewConfig(), + }, + { + Name: "parse_from_simple", + Expect: func() *Config { + cfg := NewConfig() + cfg.ParseFrom = entry.NewBodyField("from") + return cfg + }(), + }, + { + Name: "parse_to_simple", + Expect: func() *Config { + cfg := NewConfig() + cfg.ParseTo = entry.RootableField{Field: entry.NewBodyField("log")} + return cfg + }(), + }, + { + Name: "on_error_drop", + Expect: func() *Config { + cfg := NewConfig() + cfg.OnError = "drop" + return cfg + }(), + }, + { + Name: "severity", + Expect: func() *Config { + cfg := NewConfig() + parseField := entry.NewBodyField("severity_field") + severityField := helper.NewSeverityConfig() + severityField.ParseFrom = &parseField + mapping := map[string]any{ + "critical": "5xx", + "error": "4xx", + "info": "3xx", + "debug": "2xx", + } + severityField.Mapping = mapping + cfg.SeverityConfig = &severityField + return cfg + }(), + }, + { + Name: "format", + Expect: func() *Config { + cfg := NewConfig() + cfg.Format = "docker" + return cfg + }(), + }, + { + Name: "add_metadata_from_file_path", + Expect: func() *Config { + cfg := NewConfig() + cfg.AddMetadataFromFilePath = true + return cfg + }(), + }, + { + Name: "parse_to_attributes", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewAttributeField()} + return p + }(), + }, + { + Name: "parse_to_body", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + return p + }(), + }, + { + Name: "parse_to_resource", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewResourceField()} + return p + }(), + }, + }, + }.Run(t) +} diff --git a/pkg/stanza/operator/parser/container/package_test.go b/pkg/stanza/operator/parser/container/package_test.go new file mode 100644 index 000000000000..245776eec13d --- /dev/null +++ b/pkg/stanza/operator/parser/container/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package container + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go new file mode 100644 index 000000000000..d531925e9735 --- /dev/null +++ b/pkg/stanza/operator/parser/container/parser.go @@ -0,0 +1,357 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container" + +import ( + "context" + "errors" + "fmt" + "regexp" + "strings" + "sync" + "time" + + jsoniter "github.com/json-iterator/go" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +const dockerFormat = "docker" +const crioFormat = "crio" +const containerdFormat = "containerd" +const recombineInternalID = "recombine_container_internal" +const dockerPattern = "^\\{" +const crioPattern = "^(?P