From 3d7dde071bf01a6fb7edf6dadd8248d30460e597 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 10 Apr 2024 11:20:21 -0500 Subject: [PATCH 01/20] Always put decouple processor first in pipeline --- collector/go.mod | 1 + .../decouplefirstconverter/converter.go | 95 ++++++++++++ .../decouplefirstconverter/converter_test.go | 138 ++++++++++++++++++ collector/lambdacomponents/go.mod | 4 +- collector/lambdacomponents/go.sum | 10 +- .../receiver/telemetryapireceiver/go.mod | 6 +- .../receiver/telemetryapireceiver/go.sum | 12 +- 7 files changed, 250 insertions(+), 16 deletions(-) create mode 100644 collector/internal/confmap/converter/decouplefirstconverter/converter.go create mode 100644 collector/internal/confmap/converter/decouplefirstconverter/converter_test.go diff --git a/collector/go.mod b/collector/go.mod index 9eee66b3c6..b8cb755934 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -69,6 +69,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect github.com/hashicorp/go-version v1.6.0 // indirect diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter.go b/collector/internal/confmap/converter/decouplefirstconverter/converter.go new file mode 100644 index 0000000000..6a90d611e0 --- /dev/null +++ b/collector/internal/confmap/converter/decouplefirstconverter/converter.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decouplefirstconverter // import "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/decouplefirstconverter" + +import ( + "context" + "fmt" + "strings" + + "go.opentelemetry.io/collector/confmap" + "go.uber.org/zap" +) + +const ( + pipelinesKey = "service.pipelines" + processorsKey = "processors" +) + +type converter struct{} + +// New returns a confmap.Converter that ensures the decoupleprocessor is placed first in the pipeline. +func New() confmap.Converter { + return &converter{} +} + + +func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { + pipelines, err := conf.Sub(pipelinesKey) + if err != nil { + return fmt.Errorf("invalid service.pipelines configuration: %w", err) + } + + // Iterate pipelines over telemetry types: traces, metrics, logs + for telemetryType, pipelineVal := range pipelines.ToStringMap() { + // pipeline, ok := pipelineVal.(map[string]interface{}) + // if !ok { + // return fmt.Errorf("invalid pipeline configuration for telemetry type %s", telemetryType) + // } + + // Get the processors for the pipeline + processorsKey := fmt.Sprintf("%s.processors", telemetryType) + processorsSub, err := conf.Sub(processorsKey) + if err != nil { + return fmt.Errorf("invalid processors configuration for telemetry type %s: %w", telemetryType, err) + } + var processors []interface{} + if err := processorsSub.Unmarshal(&processors); err != nil { + return fmt.Errorf("invalid processors configuration for telemetry type %s: %w", telemetryType, err) + continue + } + + // If there are processors, check if the first processor is "decouple" + // and prepend it if not + if len(processors) > 0 { + firstProcessor, ok := processors[0].(string) + if !ok { + return fmt.Errorf("invalid processor configuration for telemetry type %s", telemetryType) + } + + if firstProcessor != "decouple" { + zap.L().Warn("Did not find decoupleprocessor as the first processor in the pipeline. Prepending a decoupleprocessor.") + // Prepend the decouple processor to the processors if it is not already the first processor + processors = append([]interface{}{"decouple"}, processors...) + } + + // Drop all "decouple" processors after the first + for i, v := range processors[1:] { + if pstr, ok := v.(string); ok && strings.HasPrefix(pstr, "decouple/") { + processors = append(processors[:i+1], processors[i+2:]...) + zap.L().Warn("Decouple processor out of first position. Dropped " + fmt.Sprintf("%d", i)) + } + } + + // Update the processors configuration + // pipeline["processors"] = processors + if err := processorsSub.Marshal(processors); err != nil { + return fmt.Errorf("failed to update processors configuration for telemetry type %s: %w", telemetryType, err) + } + } + } + + return nil +} diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go b/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go new file mode 100644 index 0000000000..54bf3b95df --- /dev/null +++ b/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decouplefirstconverter + +import ( + "context" + "testing" + + "go.opentelemetry.io/collector/confmap" + + "github.com/google/go-cmp/cmp" +) + +func TestConvert(t *testing.T) { + for _, tc := range []struct { + name string + conf *confmap.Conf + expected *confmap.Conf + err error + }{ + { + name: "no pipelines", + conf: confmap.New(), + expected: confmap.New(), + err: nil, + }, + { + name: "no processors in pipeline", + conf: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{}, + }, + }, + }), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{}, + }, + }, + }), + err: nil, + }, + { + name: "decouple processor first", + conf: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"decouple", "processor1"}, + }, + }, + }, + }), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"decouple", "processor1"}, + }, + }, + }, + }), + err: nil, + }, + { + name: "decouple processor not first", + conf: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"processor1", "decouple", "processor2"}, + }, + }, + }, + }), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"decouple", "processor1", "processor2"}, + }, + }, + }, + }), + err: nil, + }, + { + name: "multiple decouple processors", + conf: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"processor1", "decouple", "decouple/instance1", "processor2", "decouple/instance2"}, + }, + }, + }, + }), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"decouple", "processor1", "processor2"}, + }, + }, + }, + }), + err: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + c := New() + if err := c.Convert(context.Background(), tc.conf); err != nil { + t.Errorf("unexpected error converting: %v", err) + } + + // check that tc.conf is equal to tc.expected, but for something + // that must work on maps which are unordered + // assert.NoError(t, err) + if diff := cmp.Diff(tc.expected.ToStringMap(), tc.conf.ToStringMap()); diff != "" { + t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) + } + }) + } +} diff --git a/collector/lambdacomponents/go.mod b/collector/lambdacomponents/go.mod index e2ba5ebc40..4576f1b4ec 100644 --- a/collector/lambdacomponents/go.mod +++ b/collector/lambdacomponents/go.mod @@ -148,7 +148,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect - go.uber.org/zap v1.26.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect @@ -157,7 +157,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/grpc v1.60.1 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/collector/lambdacomponents/go.sum b/collector/lambdacomponents/go.sum index 0729cce43c..ea61c2fde1 100644 --- a/collector/lambdacomponents/go.sum +++ b/collector/lambdacomponents/go.sum @@ -555,8 +555,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/tidwall/gjson v1.10.2 h1:APbLGOM0rrEkd8WBw9C24nllro4ajFuJu0Sc9hRz8Bo= github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -687,8 +687,8 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8 go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -969,8 +969,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= -google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 70504157c5..ce240a9a73 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -7,13 +7,13 @@ replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 github.com/open-telemetry/opentelemetry-lambda/collector v0.91.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.92.0 go.opentelemetry.io/collector/consumer v0.92.0 go.opentelemetry.io/collector/pdata v1.0.1 go.opentelemetry.io/collector/receiver v0.92.0 go.opentelemetry.io/collector/semconv v0.92.0 - go.uber.org/zap v1.26.0 + go.uber.org/zap v1.27.0 ) require ( @@ -43,6 +43,6 @@ require ( golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/grpc v1.60.1 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index 4bfd206451..d27514a7d7 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -45,8 +45,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector v0.92.0 h1:XiC0ptaT1EmOkK2RI0gt3n2tkzLAkNQGf0E7hrGdyeA= @@ -75,8 +75,8 @@ go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -114,8 +114,8 @@ google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= -google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= From 728e6818b03f51e9a729b35f1ae51efee35f6233 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 10 Apr 2024 17:24:46 -0500 Subject: [PATCH 02/20] Add converter to derive processors from a base --- .../decouplefirstconverter/converter.go | 140 ++++++++++++------ .../decouplefirstconverter/converter_test.go | 36 ++--- 2 files changed, 100 insertions(+), 76 deletions(-) diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter.go b/collector/internal/confmap/converter/decouplefirstconverter/converter.go index 6a90d611e0..d1660c9696 100644 --- a/collector/internal/confmap/converter/decouplefirstconverter/converter.go +++ b/collector/internal/confmap/converter/decouplefirstconverter/converter.go @@ -20,11 +20,10 @@ import ( "strings" "go.opentelemetry.io/collector/confmap" - "go.uber.org/zap" ) const ( - pipelinesKey = "service.pipelines" + pipelinesKey = "service::pipelines" processorsKey = "processors" ) @@ -35,61 +34,104 @@ func New() confmap.Converter { return &converter{} } +type arr []interface{} + +func (a arr) moveLast(x int) error { + if x < 0 || x >= len(a) { + return fmt.Errorf("index out of bounds: %d", x) + } + return nil +} + +func (a arr) move2ndLast(x int) error { + // if index out of bounds then return error + return nil +} + +func (a arr) drop(x int) error { + return nil +} + +func (c converter) convertProcessors(processors []interface{}) []interface{} { + // Drop occurrences of "batch" and "decouple". + // This ignores edge cases and if user has configured processors earlier in the pipeline then + // implicit handling is that the config is gone. This only effects batch. + if len(processors) == 0 { + return []any{ + string("batch"), + string("decouple"), + } + } + result := make([]interface{}, 0, len(processors)) + result = addBatchAndDecouple( + dropBatchAndDecouple( + processors, + ), + ) + + return result +} func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { - pipelines, err := conf.Sub(pipelinesKey) - if err != nil { - return fmt.Errorf("invalid service.pipelines configuration: %w", err) - } + pipelines, err := conf.Sub("service::pipelines") // conf.Sub("services::pipelines") + if err != nil { + return fmt.Errorf("invalid service.pipelines configuration: %w", err) + } // Iterate pipelines over telemetry types: traces, metrics, logs - for telemetryType, pipelineVal := range pipelines.ToStringMap() { - // pipeline, ok := pipelineVal.(map[string]interface{}) - // if !ok { - // return fmt.Errorf("invalid pipeline configuration for telemetry type %s", telemetryType) - // } - + for telemetryType, pipelineVal := range pipelines.ToStringMap() { + println("telemetryType: %s", telemetryType) // Get the processors for the pipeline - processorsKey := fmt.Sprintf("%s.processors", telemetryType) - processorsSub, err := conf.Sub(processorsKey) - if err != nil { - return fmt.Errorf("invalid processors configuration for telemetry type %s: %w", telemetryType, err) + // processorsKey := fmt.Sprintf("%s::processors", telemetryType) + // extract the processors from the pipeline + pipeline, ok := pipelineVal.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid processors configuration for telemetry type %s", telemetryType) } - var processors []interface{} - if err := processorsSub.Unmarshal(&processors); err != nil { - return fmt.Errorf("invalid processors configuration for telemetry type %s: %w", telemetryType, err) - continue - } - - // If there are processors, check if the first processor is "decouple" - // and prepend it if not - if len(processors) > 0 { - firstProcessor, ok := processors[0].(string) - if !ok { - return fmt.Errorf("invalid processor configuration for telemetry type %s", telemetryType) - } - - if firstProcessor != "decouple" { - zap.L().Warn("Did not find decoupleprocessor as the first processor in the pipeline. Prepending a decoupleprocessor.") - // Prepend the decouple processor to the processors if it is not already the first processor - processors = append([]interface{}{"decouple"}, processors...) - } + processors, ok := pipeline["processors"].([]interface{}) + if !ok { + return fmt.Errorf("invalid processors configuration for telemetry type %s", telemetryType) + } + resultProcessors := c.convertProcessors(processors) + println("extracted ", len(processors), " processors, from key: ", telemetryType) + println("converted to ", len(resultProcessors), " processors") + conf.Merge(confmap.NewFromStringMap(map[string]interface{}{ + "service::pipelines::" + telemetryType + "::processors": resultProcessors, + })) + } - // Drop all "decouple" processors after the first - for i, v := range processors[1:] { - if pstr, ok := v.(string); ok && strings.HasPrefix(pstr, "decouple/") { - processors = append(processors[:i+1], processors[i+2:]...) - zap.L().Warn("Decouple processor out of first position. Dropped " + fmt.Sprintf("%d", i)) - } - } + return nil +} - // Update the processors configuration - // pipeline["processors"] = processors - if err := processorsSub.Marshal(processors); err != nil { - return fmt.Errorf("failed to update processors configuration for telemetry type %s: %w", telemetryType, err) +// Drop all occurrences of "batch" and "decouple" processors from the pipeline. +func dropBatchAndDecouple(processors []interface{}) []interface{} { + // Drop occurrences of "batch" and "decouple". + // This ignores edge cases and if user has configured processors earlier in the pipeline then + // implicit handling is that the config is gone. This only effects batch. + if len(processors) == 0 { + return make([]interface{}, 0) + } + for i, v := range processors { + if pstr, ok := v.(string); ok && (strings.HasPrefix(pstr, "batch") || strings.HasPrefix(pstr, "decouple")) { + if i < len(processors) { + processors = append(processors[:i], processors[i+1:]...) } - } - } + } + } + return processors +} + +// Add the "batch" and "decouple" processors to the pipeline. This is a simplistic implementation +// that assumes the default processors aren't already in the pipeline, which is valid when +// the pipeline filter to drop the processors is applied first. +func addBatchAndDecouple(processors []interface{}) []interface{} { + suffix := []interface{}{"batch", "decouple"} + if processors == nil || len(processors) == 0 { + return suffix + } + result := make([]interface{}, 0, len(processors)+len(suffix)) + result = append(result, processors...) + result = append(result, suffix...) - return nil + return result } diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go b/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go index 54bf3b95df..5b373abe30 100644 --- a/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go +++ b/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go @@ -23,6 +23,8 @@ import ( "github.com/google/go-cmp/cmp" ) + + func TestConvert(t *testing.T) { for _, tc := range []struct { name string @@ -45,31 +47,11 @@ func TestConvert(t *testing.T) { }, }, }), - expected: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{}, - }, - }, - }), - err: nil, - }, - { - name: "decouple processor first", - conf: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"decouple", "processor1"}, - }, - }, - }, - }), expected: confmap.NewFromStringMap(map[string]interface{}{ "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"decouple", "processor1"}, + "processors": []interface{}{"batch", "decouple"}, }, }, }, @@ -77,12 +59,12 @@ func TestConvert(t *testing.T) { err: nil, }, { - name: "decouple processor not first", + name: "processors in pipeline", conf: confmap.NewFromStringMap(map[string]interface{}{ "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "decouple", "processor2"}, + "processors": []interface{}{"processor1", "processor2"}, }, }, }, @@ -91,7 +73,7 @@ func TestConvert(t *testing.T) { "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"decouple", "processor1", "processor2"}, + "processors": []interface{}{"processor1", "processor2", "batch", "decouple"}, }, }, }, @@ -99,12 +81,12 @@ func TestConvert(t *testing.T) { err: nil, }, { - name: "multiple decouple processors", + name: "batch and decouple processors already present", conf: confmap.NewFromStringMap(map[string]interface{}{ "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "decouple", "decouple/instance1", "processor2", "decouple/instance2"}, + "processors": []interface{}{"processor1", "batch", "processor2", "decouple"}, }, }, }, @@ -113,7 +95,7 @@ func TestConvert(t *testing.T) { "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"decouple", "processor1", "processor2"}, + "processors": []interface{}{"processor1", "processor2", "batch", "decouple"}, }, }, }, From f4b76d3b099e58acbade1c90925a800125e39c2f Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 10 Apr 2024 17:27:37 -0500 Subject: [PATCH 03/20] Remove scratchpad code --- collector/internal/collector/collector.go | 2 +- .../decouplefirstconverter/converter.go | 18 ------------------ 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/collector/internal/collector/collector.go b/collector/internal/collector/collector.go index a594038499..58aa1de7fe 100644 --- a/collector/internal/collector/collector.go +++ b/collector/internal/collector/collector.go @@ -68,7 +68,7 @@ func NewCollector(logger *zap.Logger, factories otelcol.Factories, version strin ResolverSettings: confmap.ResolverSettings{ URIs: []string{getConfig(l)}, Providers: mapProvider, - Converters: []confmap.Converter{expandconverter.New(), disablequeuedretryconverter.New()}, + Converters: []confmap.Converter{expandconverter.New(), disablequeuedretryconverter.New(), decouplefirstconverter.New()}, }, } cfgProvider, err := otelcol.NewConfigProvider(cfgSet) diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter.go b/collector/internal/confmap/converter/decouplefirstconverter/converter.go index d1660c9696..f2713d105f 100644 --- a/collector/internal/confmap/converter/decouplefirstconverter/converter.go +++ b/collector/internal/confmap/converter/decouplefirstconverter/converter.go @@ -34,24 +34,6 @@ func New() confmap.Converter { return &converter{} } -type arr []interface{} - -func (a arr) moveLast(x int) error { - if x < 0 || x >= len(a) { - return fmt.Errorf("index out of bounds: %d", x) - } - return nil -} - -func (a arr) move2ndLast(x int) error { - // if index out of bounds then return error - return nil -} - -func (a arr) drop(x int) error { - return nil -} - func (c converter) convertProcessors(processors []interface{}) []interface{} { // Drop occurrences of "batch" and "decouple". // This ignores edge cases and if user has configured processors earlier in the pipeline then From ffab5969f1676209110a4375f801bc9d6dd61078 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:46:02 -0500 Subject: [PATCH 04/20] implement rules and test --- collector/internal/collector/collector.go | 3 +- .../decoupleafterbatchconverter/converter.go | 119 ++++++++++++++++++ .../converter_test.go | 45 +++++-- .../decouplefirstconverter/converter.go | 119 ------------------ .../disablequeuedretryconverter/converter.go | 3 +- 5 files changed, 158 insertions(+), 131 deletions(-) create mode 100644 collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go rename collector/internal/confmap/converter/{decouplefirstconverter => decoupleafterbatchconverter}/converter_test.go (73%) delete mode 100644 collector/internal/confmap/converter/decouplefirstconverter/converter.go diff --git a/collector/internal/collector/collector.go b/collector/internal/collector/collector.go index 58aa1de7fe..bd1c2fe637 100644 --- a/collector/internal/collector/collector.go +++ b/collector/internal/collector/collector.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/disablequeuedretryconverter" + "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/decoupleafterbatchconverter" ) // Collector runs a single otelcol as a go routine within the @@ -68,7 +69,7 @@ func NewCollector(logger *zap.Logger, factories otelcol.Factories, version strin ResolverSettings: confmap.ResolverSettings{ URIs: []string{getConfig(l)}, Providers: mapProvider, - Converters: []confmap.Converter{expandconverter.New(), disablequeuedretryconverter.New(), decouplefirstconverter.New()}, + Converters: []confmap.Converter{expandconverter.New(), disablequeuedretryconverter.New(), decoupleafterbatchconverter.New()}, }, } cfgProvider, err := otelcol.NewConfigProvider(cfgSet) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go new file mode 100644 index 0000000000..0ec5eac719 --- /dev/null +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go @@ -0,0 +1,119 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The decoupleafterbatchconverter implements the Converter for mutating Collector +// configurations to ensure the decouple processor is placed after the batch processor. +// This is logically implemented by appending the decouple processor to the end of +// processor chains where a batch processor is found unless another decouple processor +// was seen. +package decoupleafterbatchconverter + +import ( + "context" + "fmt" + "strings" + + "go.opentelemetry.io/collector/confmap" +) + +const ( + serviceKey = "service" + pipelinesKey = "pipelines" + processorsKey = "processors" + batchProcessor = "batch" + decoupleProcessor = "decouple" +) + +type converter struct{} + +// New returns a confmap.Converter that ensures the decoupleprocessor is placed first in the pipeline. +func New() confmap.Converter { + return &converter{} +} + +func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { + serviceVal := conf.Get(serviceKey) + service, ok := serviceVal.(map[string]interface{}) + if !ok { + return nil + } + + pipelinesVal, ok := service[pipelinesKey] + if !ok { + return nil + } + + pipelines, ok := pipelinesVal.(map[string]interface{}) + if !ok { + return nil + } + + // accumulates updates over the pipelines and applies them + // once all pipeline configs are processed + updates := make(map[string]interface{}) + for telemetryType, pipelineVal := range pipelines { + pipeline, ok := pipelineVal.(map[string]interface{}) + if !ok { + continue + } + + processorsVal, ok := pipeline[processorsKey] + if !ok { + continue + } + + processors, ok := processorsVal.([]interface{}) + if !ok { + continue + } + + // accumulate config updates + if appendDecouple(processors) { + processors = append(processors, decoupleProcessor) + updates[fmt.Sprintf("%s::%s::%s::%s", serviceKey, pipelinesKey, telemetryType, processorsKey)] = processors + break + } + + } + + // apply all updates + if len(updates) > 0 { + if err := conf.Merge(confmap.NewFromStringMap(updates)); err != nil { + return err + } + } + + return nil +} + +// The appendDecouple predicate encodes most of the logic for mutating processor pipelines +// in the Convert function. It tells whether there was a decouple processor after the last +// batch processor and if so it appends decouple to the end. +func appendDecouple(processors []interface{}) bool { + var appendDecouple bool + for _, processorVal := range processors { + processor, ok := processorVal.(string) + if !ok { + continue + } + appendDecouple = false + processorBaseName := strings.Split(processor, "/")[0] + if processorBaseName == batchProcessor { + appendDecouple = true + } else if processorBaseName == decoupleProcessor { + appendDecouple = false + } + } + return appendDecouple +} diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go similarity index 73% rename from collector/internal/confmap/converter/decouplefirstconverter/converter_test.go rename to collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index 5b373abe30..82a95b6b00 100644 --- a/collector/internal/confmap/converter/decouplefirstconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package decouplefirstconverter +package decoupleafterbatchconverter import ( "context" @@ -23,8 +23,6 @@ import ( "github.com/google/go-cmp/cmp" ) - - func TestConvert(t *testing.T) { for _, tc := range []struct { name string @@ -33,11 +31,21 @@ func TestConvert(t *testing.T) { err error }{ { - name: "no pipelines", + name: "no service", conf: confmap.New(), expected: confmap.New(), err: nil, }, + { + name: "no pipelines", + conf: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{}, + }), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{}, + }), + err: nil, + }, { name: "no processors in pipeline", conf: confmap.NewFromStringMap(map[string]interface{}{ @@ -47,6 +55,26 @@ func TestConvert(t *testing.T) { }, }, }), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{}, + }, + }, + }), + err: nil, + }, + { + name: "batch processor present", + conf: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"batch"}, + }, + }, + }, + }), expected: confmap.NewFromStringMap(map[string]interface{}{ "service": map[string]interface{}{ "pipelines": map[string]interface{}{ @@ -59,7 +87,7 @@ func TestConvert(t *testing.T) { err: nil, }, { - name: "processors in pipeline", + name: "batch processor not present", conf: confmap.NewFromStringMap(map[string]interface{}{ "service": map[string]interface{}{ "pipelines": map[string]interface{}{ @@ -73,7 +101,7 @@ func TestConvert(t *testing.T) { "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "processor2", "batch", "decouple"}, + "processors": []interface{}{"processor1", "processor2"}, }, }, }, @@ -95,7 +123,7 @@ func TestConvert(t *testing.T) { "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "processor2", "batch", "decouple"}, + "processors": []interface{}{"processor1", "batch", "processor2", "decouple"}, }, }, }, @@ -109,9 +137,6 @@ func TestConvert(t *testing.T) { t.Errorf("unexpected error converting: %v", err) } - // check that tc.conf is equal to tc.expected, but for something - // that must work on maps which are unordered - // assert.NoError(t, err) if diff := cmp.Diff(tc.expected.ToStringMap(), tc.conf.ToStringMap()); diff != "" { t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) } diff --git a/collector/internal/confmap/converter/decouplefirstconverter/converter.go b/collector/internal/confmap/converter/decouplefirstconverter/converter.go deleted file mode 100644 index f2713d105f..0000000000 --- a/collector/internal/confmap/converter/decouplefirstconverter/converter.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package decouplefirstconverter // import "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/decouplefirstconverter" - -import ( - "context" - "fmt" - "strings" - - "go.opentelemetry.io/collector/confmap" -) - -const ( - pipelinesKey = "service::pipelines" - processorsKey = "processors" -) - -type converter struct{} - -// New returns a confmap.Converter that ensures the decoupleprocessor is placed first in the pipeline. -func New() confmap.Converter { - return &converter{} -} - -func (c converter) convertProcessors(processors []interface{}) []interface{} { - // Drop occurrences of "batch" and "decouple". - // This ignores edge cases and if user has configured processors earlier in the pipeline then - // implicit handling is that the config is gone. This only effects batch. - if len(processors) == 0 { - return []any{ - string("batch"), - string("decouple"), - } - } - result := make([]interface{}, 0, len(processors)) - result = addBatchAndDecouple( - dropBatchAndDecouple( - processors, - ), - ) - - return result -} - -func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { - pipelines, err := conf.Sub("service::pipelines") // conf.Sub("services::pipelines") - if err != nil { - return fmt.Errorf("invalid service.pipelines configuration: %w", err) - } - - // Iterate pipelines over telemetry types: traces, metrics, logs - for telemetryType, pipelineVal := range pipelines.ToStringMap() { - println("telemetryType: %s", telemetryType) - // Get the processors for the pipeline - // processorsKey := fmt.Sprintf("%s::processors", telemetryType) - // extract the processors from the pipeline - pipeline, ok := pipelineVal.(map[string]interface{}) - if !ok { - return fmt.Errorf("invalid processors configuration for telemetry type %s", telemetryType) - } - processors, ok := pipeline["processors"].([]interface{}) - if !ok { - return fmt.Errorf("invalid processors configuration for telemetry type %s", telemetryType) - } - resultProcessors := c.convertProcessors(processors) - println("extracted ", len(processors), " processors, from key: ", telemetryType) - println("converted to ", len(resultProcessors), " processors") - conf.Merge(confmap.NewFromStringMap(map[string]interface{}{ - "service::pipelines::" + telemetryType + "::processors": resultProcessors, - })) - } - - return nil -} - -// Drop all occurrences of "batch" and "decouple" processors from the pipeline. -func dropBatchAndDecouple(processors []interface{}) []interface{} { - // Drop occurrences of "batch" and "decouple". - // This ignores edge cases and if user has configured processors earlier in the pipeline then - // implicit handling is that the config is gone. This only effects batch. - if len(processors) == 0 { - return make([]interface{}, 0) - } - for i, v := range processors { - if pstr, ok := v.(string); ok && (strings.HasPrefix(pstr, "batch") || strings.HasPrefix(pstr, "decouple")) { - if i < len(processors) { - processors = append(processors[:i], processors[i+1:]...) - } - } - } - return processors -} - -// Add the "batch" and "decouple" processors to the pipeline. This is a simplistic implementation -// that assumes the default processors aren't already in the pipeline, which is valid when -// the pipeline filter to drop the processors is applied first. -func addBatchAndDecouple(processors []interface{}) []interface{} { - suffix := []interface{}{"batch", "decouple"} - if processors == nil || len(processors) == 0 { - return suffix - } - result := make([]interface{}, 0, len(processors)+len(suffix)) - result = append(result, processors...) - result = append(result, suffix...) - - return result -} diff --git a/collector/internal/confmap/converter/disablequeuedretryconverter/converter.go b/collector/internal/confmap/converter/disablequeuedretryconverter/converter.go index 365b660080..981cb4ca38 100644 --- a/collector/internal/confmap/converter/disablequeuedretryconverter/converter.go +++ b/collector/internal/confmap/converter/disablequeuedretryconverter/converter.go @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package disablequeuedretryconverter // import "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/disablequeuedretryconverter" import ( @@ -58,6 +57,8 @@ var exporters = map[string]struct{}{ type converter struct { } + + // New returns a confmap.Converter, that ensures queued retry is disabled for all configured exporters. func New() confmap.Converter { return &converter{} From 25e209214d462abb81e26185941f49895e3cc801 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Sun, 14 Apr 2024 11:43:54 -0500 Subject: [PATCH 05/20] update tests --- .../converter_test.go | 184 +++++++----------- 1 file changed, 66 insertions(+), 118 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index 82a95b6b00..b8a4b6d1e4 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package decoupleafterbatchconverter import ( @@ -24,122 +23,71 @@ import ( ) func TestConvert(t *testing.T) { - for _, tc := range []struct { - name string - conf *confmap.Conf - expected *confmap.Conf - err error - }{ - { - name: "no service", - conf: confmap.New(), - expected: confmap.New(), - err: nil, - }, - { - name: "no pipelines", - conf: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{}, - }), - expected: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{}, - }), - err: nil, - }, - { - name: "no processors in pipeline", - conf: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{}, - }, - }, - }), - expected: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{}, - }, - }, - }), - err: nil, - }, - { - name: "batch processor present", - conf: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"batch"}, - }, - }, - }, - }), - expected: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"batch", "decouple"}, - }, - }, - }, - }), - err: nil, - }, - { - name: "batch processor not present", - conf: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "processor2"}, - }, - }, - }, - }), - expected: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "processor2"}, - }, - }, - }, - }), - err: nil, - }, - { - name: "batch and decouple processors already present", - conf: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "batch", "processor2", "decouple"}, - }, - }, - }, - }), - expected: confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": []interface{}{"processor1", "batch", "processor2", "decouple"}, - }, - }, - }, - }), - err: nil, - }, - } { - t.Run(tc.name, func(t *testing.T) { - c := New() - if err := c.Convert(context.Background(), tc.conf); err != nil { - t.Errorf("unexpected error converting: %v", err) - } + // Since this really tests differences in processors, it's easier to read cases + // without the repeated definition of other fields in the config. + baseConf := func(processors []interface{}) *confmap.Conf { + return confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": processors, + }, + }, + }, + }) + } + + testCases := []struct { + name string + processors []interface{} + expectedProcessors []interface{} + err error + }{ + { + name: "no service", + processors: nil, + expectedProcessors: nil, + }, + { + name: "no pipelines", + processors: nil, + expectedProcessors: nil, + }, + { + name: "no processors in pipeline", + processors: nil, + expectedProcessors: nil, + }, + { + name: "batch processor present", + processors: []interface{}{"batch"}, + expectedProcessors: []interface{}{"batch", "decouple"}, + }, + { + name: "batch processor not present", + processors: []interface{}{"processor1", "processor2"}, + expectedProcessors: []interface{}{"processor1", "processor2"}, + }, + { + name: "batch and decouple processors already present", + processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + conf := baseConf(tc.processors) + expected := baseConf(tc.expectedProcessors) - if diff := cmp.Diff(tc.expected.ToStringMap(), tc.conf.ToStringMap()); diff != "" { - t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) - } - }) - } + c := New() + err := c.Convert(context.Background(), conf) + if err != tc.err { + t.Errorf("unexpected error converting: %v", err) + } + if diff := cmp.Diff(expected.ToStringMap(), conf.ToStringMap()); diff != "" { + t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) + } + }) + } } From d543b2cb4a7bb029bdb3e9a6d73dad2f7e294501 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Sun, 14 Apr 2024 11:58:38 -0500 Subject: [PATCH 06/20] improve tests for reviewers --- .../converter_test.go | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index b8a4b6d1e4..caff38545a 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -43,6 +43,15 @@ func TestConvert(t *testing.T) { expectedProcessors []interface{} err error }{ + // This test is first, because it illustrates the difference in making the rule that when + // batch is present the converter appends decouple processor to the end of chain versus + // the approach of this code which is to do this only when the last instance of batch + // is not followed by decouple processor. + { + name: "batch then decouple in middle of chain", + processors: []interface{}{"processor1", "batch", "decouple", "processor2"}, + expectedProcessors: []interface{}{"processor1", "batch", "decouple", "processor2"}, + }, { name: "no service", processors: nil, @@ -54,22 +63,34 @@ func TestConvert(t *testing.T) { expectedProcessors: nil, }, { - name: "no processors in pipeline", + name: "no processors in chain", processors: nil, expectedProcessors: nil, }, { - name: "batch processor present", + name: "batch processor in singleton chain", processors: []interface{}{"batch"}, expectedProcessors: []interface{}{"batch", "decouple"}, }, + { + name: "batch processor present twice", + processors: []interface{}{"batch", "processor1", "batch"}, + expectedProcessors: []interface{}{"batch", "processor1", "batch", "decouple"}, + }, + { name: "batch processor not present", processors: []interface{}{"processor1", "processor2"}, expectedProcessors: []interface{}{"processor1", "processor2"}, }, { - name: "batch and decouple processors already present", + name: "batch sandwiched between processors no decouple", + processors: []interface{}{"processor1", "batch", "processor2"}, + expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + }, + + { + name: "batch and decouple processors already present in correct position", processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, }, From 4c5101895cec258088405b73c35df202b9ea0840 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Tue, 16 Apr 2024 15:47:54 -0500 Subject: [PATCH 07/20] fix toggle for append predicate --- .../internal/collector/collector_test.go | 120 ++++++++++++++++++ .../decoupleafterbatchconverter/converter.go | 1 - 2 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 collector/internal/collector/collector_test.go diff --git a/collector/internal/collector/collector_test.go b/collector/internal/collector/collector_test.go new file mode 100644 index 0000000000..578bc8d107 --- /dev/null +++ b/collector/internal/collector/collector_test.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/otelcol" + "go.uber.org/zap" + + "github.com/google/go-cmp/cmp" +) + +func TestNewCollector(t *testing.T) { + // Since this really tests differences in processors, it's easier to read cases + // without the repeated definition of other fields in the config. + // baseConf := func(processors []interface{}) *confmap.Conf { + // return confmap.NewFromStringMap(map[string]interface{}{ + // "service": map[string]interface{}{ + // "pipelines": map[string]interface{}{ + // "traces": map[string]interface{}{ + // "processors": processors, + // }, + // }, + // }, + // }) + // } + + testCases := []struct { + name string + processors []interface{} + expectedProcessors []interface{} + err error + }{ + // This test is first, because it illustrates the difference in making the rule that when + // batch is present the converter appends decouple processor to the end of chain versus + // the approach of this code which is to do this only when the last instance of batch + // is not followed by decouple processor. + { + name: "batch then decouple in middle of chain", + processors: []interface{}{"processor1", "batch", "decouple", "processor2"}, + expectedProcessors: []interface{}{"processor1", "batch", "decouple", "processor2"}, + }, + { + name: "no service", + processors: nil, + expectedProcessors: nil, + }, + { + name: "no pipelines", + processors: nil, + expectedProcessors: nil, + }, + { + name: "no processors in chain", + processors: nil, + expectedProcessors: nil, + }, + { + name: "batch processor in singleton chain", + processors: []interface{}{"batch"}, + expectedProcessors: []interface{}{"batch", "decouple"}, + }, + { + name: "batch processor present twice", + processors: []interface{}{"batch", "processor1", "batch"}, + expectedProcessors: []interface{}{"batch", "processor1", "batch", "decouple"}, + }, + + { + name: "batch processor not present", + processors: []interface{}{"processor1", "processor2"}, + expectedProcessors: []interface{}{"processor1", "processor2"}, + }, + { + name: "batch sandwiched between processors no decouple", + processors: []interface{}{"processor1", "batch", "processor2"}, + expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + }, + + { + name: "batch and decouple processors already present in correct position", + processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + }, + } + + factories := otelcol.Factories{} + c := NewCollector(&zap.Logger{}, factories, "version") + + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + conf := baseConf(tc.processors) + expected := baseConf(tc.expectedProcessors) + + c := New() + err := c.Convert(context.Background(), conf) + if err != tc.err { + t.Errorf("unexpected error converting: %v", err) + } + if diff := cmp.Diff(expected.ToStringMap(), conf.ToStringMap()); diff != "" { + t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) + } + }) + } +} diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go index 0ec5eac719..c1485d1164 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go @@ -107,7 +107,6 @@ func appendDecouple(processors []interface{}) bool { if !ok { continue } - appendDecouple = false processorBaseName := strings.Split(processor, "/")[0] if processorBaseName == batchProcessor { appendDecouple = true From 38e8f1379ae1780873e0d2959ba16b42cadf983e Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Tue, 16 Apr 2024 15:52:49 -0500 Subject: [PATCH 08/20] Fix typo in function comment --- .../converter/decoupleafterbatchconverter/converter.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go index c1485d1164..5549669bf4 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go @@ -97,9 +97,9 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { return nil } -// The appendDecouple predicate encodes most of the logic for mutating processor pipelines -// in the Convert function. It tells whether there was a decouple processor after the last -// batch processor and if so it appends decouple to the end. +// The appendDecouple is the filter predicate for the Convert function action. It tells whether +// (bool) there was a decouple processor after the last +// batch processor, which Convert uses to decide whether to append the decouple processor. func appendDecouple(processors []interface{}) bool { var appendDecouple bool for _, processorVal := range processors { From e35b3444f3dd51183fa63cfc07b5675c8725166e Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:00:41 -0500 Subject: [PATCH 09/20] Document converter and auto-configuration --- .../decoupleafterbatchconverter/README.md | 13 ++++++++++++ .../processor/decoupleprocessor/README.md | 21 ++++++++++++++----- 2 files changed, 29 insertions(+), 5 deletions(-) create mode 100644 collector/internal/confmap/converter/decoupleafterbatchconverter/README.md diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md b/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md new file mode 100644 index 0000000000..d14f378637 --- /dev/null +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md @@ -0,0 +1,13 @@ +# DecoupleAfterBatch Converter + +The `DecoupleAfterBatch` converter automatically modifies the collector's configuration for the Lambda distribution. Its purpose is to ensure that a decouple processor is always present after a batch processor in a pipeline, in order to prevent potential data loss due to the Lambda environment being frozen. + +## Behavior + +The converter scans the collector's configuration and makes the following adjustments: + +1. If a pipeline contains a batch processor with no decouple processor defined after it, the converter will automatically add a decouple processor to the pipeline immediately after the batch processor. + +2. If a pipeline contains a batch processor with a decouple processor already defined after it or there is no batch processor defined, the converter will not make any changes to the pipeline configuration. + +The automatically added decouple processor will have a default queue size of 2000. This value was chosen to be high enough to accommodate the default batch processor timeout of 200ms. diff --git a/collector/processor/decoupleprocessor/README.md b/collector/processor/decoupleprocessor/README.md index dd0cc1bd72..51a15ae5aa 100644 --- a/collector/processor/decoupleprocessor/README.md +++ b/collector/processor/decoupleprocessor/README.md @@ -7,22 +7,33 @@ | Distributions | [extension] | This processor decouples the receiver and exporter ends of the pipeline. This allows the lambda function to finish before traces/metrics/logs have been exported by the collector. The processor is aware of the Lambda [lifecycle] and will prevent the environment from being frozen or shutdown until any pending traces/metrics/logs have been exported. -In this way the response times of the Lambda function is not impacted by the need to export data, however the billed duration will include the time taken to export data as well as runtime of the lambda function. +In this way the response times of the Lambda function is not impacted by the need to export data, however the billed duration will include the time taken to export data as well as runtime of the lambda function. The decouple processor should always be the last processor in the list to ensure that there are no issues with data being sent while the environment is about to be frozen, which could result in lost data. -When combined with the batch processor, the number of exports required can be significantly reduced and therefore the cost of running the lambda. This is with the trade-off that the data will not be available at your chosen endpoint until some time after the invocation, up to a maximum of 5 minutes (the timeout that the environment is shutdown when no further invocations are received). +When combined with the batch processor, the number of exports required can be significantly reduced and therefore the cost of running the lambda. This is with the trade-off that the data will not be available at your chosen endpoint until some time after the invocation, up to a maximum of 5 minutes (the timeout that the environment is shutdown when no further invocations are received). + +## Auto-Configuration + +Due to the significance of performance improvement with this approach, the OpenTelemetry Lambda Layer automatically configures the decouple processor when the batch processor is used. This provides the best performance by default. + +When running the Collector for the Lambda Layer, the configuration is converted by automatically adding the decouple processor to all pipelines if the following conditions are met: + +1. The pipeline contains a batch processor. +2. There is no decouple processor already defined after the batch processor. + +This automatic configuration helps prevent the data loss scenarios that can occur when the Lambda environment is frozen as the batch processor continues aggregating data. The decouple processor allows the Lambda function invocation to complete while the collector continues exporting the data asynchronously. ## Processor Configuration ```yaml processors: decouple: - # max_queue_size allows you to control how many spans etc. are accepted before the pipeline blocks + # max_queue_size allows you to control how many spans etc. are accepted before the pipeline blocks # until an export has been completed. Default value is 200. - max_queue_size: 20 + max_queue_size: 20 ``` [in development]: https://github.com/open-telemetry/opentelemetry-collector#development [extension]: https://github.com/open-telemetry/opentelemetry-lambda/collector -[lifecycle]: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-lifecycle \ No newline at end of file +[lifecycle]: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-lifecycle From b5360469d913af38e5308343b33b6d4aa5599dd0 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:00:41 -0500 Subject: [PATCH 10/20] Document converter and auto-configuration --- .../confmap/converter/decoupleafterbatchconverter/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md b/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md index d14f378637..f4a5ccb2a9 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md @@ -9,5 +9,3 @@ The converter scans the collector's configuration and makes the following adjust 1. If a pipeline contains a batch processor with no decouple processor defined after it, the converter will automatically add a decouple processor to the pipeline immediately after the batch processor. 2. If a pipeline contains a batch processor with a decouple processor already defined after it or there is no batch processor defined, the converter will not make any changes to the pipeline configuration. - -The automatically added decouple processor will have a default queue size of 2000. This value was chosen to be high enough to accommodate the default batch processor timeout of 200ms. From a10b9e1a6b39e815c3a737c808e407f8cfe7670d Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:11:38 -0500 Subject: [PATCH 11/20] rm errant test --- .../internal/collector/collector_test.go | 120 ------------------ 1 file changed, 120 deletions(-) delete mode 100644 collector/internal/collector/collector_test.go diff --git a/collector/internal/collector/collector_test.go b/collector/internal/collector/collector_test.go deleted file mode 100644 index 578bc8d107..0000000000 --- a/collector/internal/collector/collector_test.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package collector - -import ( - "context" - "testing" - - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/otelcol" - "go.uber.org/zap" - - "github.com/google/go-cmp/cmp" -) - -func TestNewCollector(t *testing.T) { - // Since this really tests differences in processors, it's easier to read cases - // without the repeated definition of other fields in the config. - // baseConf := func(processors []interface{}) *confmap.Conf { - // return confmap.NewFromStringMap(map[string]interface{}{ - // "service": map[string]interface{}{ - // "pipelines": map[string]interface{}{ - // "traces": map[string]interface{}{ - // "processors": processors, - // }, - // }, - // }, - // }) - // } - - testCases := []struct { - name string - processors []interface{} - expectedProcessors []interface{} - err error - }{ - // This test is first, because it illustrates the difference in making the rule that when - // batch is present the converter appends decouple processor to the end of chain versus - // the approach of this code which is to do this only when the last instance of batch - // is not followed by decouple processor. - { - name: "batch then decouple in middle of chain", - processors: []interface{}{"processor1", "batch", "decouple", "processor2"}, - expectedProcessors: []interface{}{"processor1", "batch", "decouple", "processor2"}, - }, - { - name: "no service", - processors: nil, - expectedProcessors: nil, - }, - { - name: "no pipelines", - processors: nil, - expectedProcessors: nil, - }, - { - name: "no processors in chain", - processors: nil, - expectedProcessors: nil, - }, - { - name: "batch processor in singleton chain", - processors: []interface{}{"batch"}, - expectedProcessors: []interface{}{"batch", "decouple"}, - }, - { - name: "batch processor present twice", - processors: []interface{}{"batch", "processor1", "batch"}, - expectedProcessors: []interface{}{"batch", "processor1", "batch", "decouple"}, - }, - - { - name: "batch processor not present", - processors: []interface{}{"processor1", "processor2"}, - expectedProcessors: []interface{}{"processor1", "processor2"}, - }, - { - name: "batch sandwiched between processors no decouple", - processors: []interface{}{"processor1", "batch", "processor2"}, - expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - }, - - { - name: "batch and decouple processors already present in correct position", - processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - }, - } - - factories := otelcol.Factories{} - c := NewCollector(&zap.Logger{}, factories, "version") - - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - conf := baseConf(tc.processors) - expected := baseConf(tc.expectedProcessors) - - c := New() - err := c.Convert(context.Background(), conf) - if err != tc.err { - t.Errorf("unexpected error converting: %v", err) - } - if diff := cmp.Diff(expected.ToStringMap(), conf.ToStringMap()); diff != "" { - t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) - } - }) - } -} From 65ce10fe02452ec162a9579e0cdf5382c739c4e9 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 12:49:36 -0500 Subject: [PATCH 12/20] Add tests to clarify decouple->batch ill-formed chain --- .../decoupleafterbatchconverter/converter_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index caff38545a..642b8d4275 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -94,6 +94,16 @@ func TestConvert(t *testing.T) { processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, }, + { + name: "decouple and batch", + processors: []interface{}{"decouple", "batch"}, + expectedProcessors: []interface{}{"decouple", "batch", "decouple"}, + }, + { + name: "decouple then match mixed with others in the pipelinefirst then batch somewhere", + processors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3"}, + expectedProcessors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3", "decouple"}, + }, } for _, tc := range testCases { From ec45851f61cc7e7ed1dd66ca384406e82393a26e Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 12:58:41 -0500 Subject: [PATCH 13/20] Fix typo in test case description --- .../converter/decoupleafterbatchconverter/converter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index 642b8d4275..27a699808c 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -100,7 +100,7 @@ func TestConvert(t *testing.T) { expectedProcessors: []interface{}{"decouple", "batch", "decouple"}, }, { - name: "decouple then match mixed with others in the pipelinefirst then batch somewhere", + name: "decouple then batch mixed with others in the pipelinefirst then batch somewhere", processors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3"}, expectedProcessors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3", "decouple"}, }, From 7b236f318a6362b279ef14904315380f5b6030cb Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 13:19:12 -0500 Subject: [PATCH 14/20] Improve name of predicate/helper --- .../decoupleafterbatchconverter/converter.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go index 5549669bf4..4079ff994e 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go @@ -79,7 +79,7 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { } // accumulate config updates - if appendDecouple(processors) { + if shouldAppendDecouple(processors) { processors = append(processors, decoupleProcessor) updates[fmt.Sprintf("%s::%s::%s::%s", serviceKey, pipelinesKey, telemetryType, processorsKey)] = processors break @@ -97,11 +97,11 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { return nil } -// The appendDecouple is the filter predicate for the Convert function action. It tells whether +// The shouldAppendDecouple is the filter predicate for the Convert function action. It tells whether // (bool) there was a decouple processor after the last // batch processor, which Convert uses to decide whether to append the decouple processor. -func appendDecouple(processors []interface{}) bool { - var appendDecouple bool +func shouldAppendDecouple(processors []interface{}) bool { + var shouldAppendDecouple bool for _, processorVal := range processors { processor, ok := processorVal.(string) if !ok { @@ -109,10 +109,10 @@ func appendDecouple(processors []interface{}) bool { } processorBaseName := strings.Split(processor, "/")[0] if processorBaseName == batchProcessor { - appendDecouple = true + shouldAppendDecouple = true } else if processorBaseName == decoupleProcessor { - appendDecouple = false + shouldAppendDecouple = false } } - return appendDecouple + return shouldAppendDecouple } From 1b21b0b333cea8e637de3015770812aaef9017bb Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 13:22:26 -0500 Subject: [PATCH 15/20] Update collector/processor/decoupleprocessor/README.md Co-authored-by: Adam Charrett <73886859+adcharre@users.noreply.github.com> --- collector/processor/decoupleprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/processor/decoupleprocessor/README.md b/collector/processor/decoupleprocessor/README.md index 51a15ae5aa..0c94586f4c 100644 --- a/collector/processor/decoupleprocessor/README.md +++ b/collector/processor/decoupleprocessor/README.md @@ -15,7 +15,7 @@ When combined with the batch processor, the number of exports required can be si ## Auto-Configuration -Due to the significance of performance improvement with this approach, the OpenTelemetry Lambda Layer automatically configures the decouple processor when the batch processor is used. This provides the best performance by default. +Due to the significant performance improvements with this approach, the OpenTelemetry Lambda Layer automatically configures the decouple processor when the batch processor is used. This ensures the best performance by default. When running the Collector for the Lambda Layer, the configuration is converted by automatically adding the decouple processor to all pipelines if the following conditions are met: From a8181793dc488d3fbc0fe0e2566efd6c9011721f Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 13:26:24 -0500 Subject: [PATCH 16/20] gofmt -s -w . --- .../converter_test.go | 170 +++++++++--------- 1 file changed, 85 insertions(+), 85 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index 27a699808c..a8a16def1d 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -25,100 +25,100 @@ import ( func TestConvert(t *testing.T) { // Since this really tests differences in processors, it's easier to read cases // without the repeated definition of other fields in the config. - baseConf := func(processors []interface{}) *confmap.Conf { - return confmap.NewFromStringMap(map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "traces": map[string]interface{}{ - "processors": processors, - }, - }, - }, - }) - } + baseConf := func(processors []interface{}) *confmap.Conf { + return confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": processors, + }, + }, + }, + }) + } - testCases := []struct { - name string - processors []interface{} - expectedProcessors []interface{} - err error - }{ + testCases := []struct { + name string + processors []interface{} + expectedProcessors []interface{} + err error + }{ // This test is first, because it illustrates the difference in making the rule that when // batch is present the converter appends decouple processor to the end of chain versus // the approach of this code which is to do this only when the last instance of batch // is not followed by decouple processor. { - name: "batch then decouple in middle of chain", - processors: []interface{}{"processor1", "batch", "decouple", "processor2"}, + name: "batch then decouple in middle of chain", + processors: []interface{}{"processor1", "batch", "decouple", "processor2"}, expectedProcessors: []interface{}{"processor1", "batch", "decouple", "processor2"}, }, - { - name: "no service", - processors: nil, - expectedProcessors: nil, - }, - { - name: "no pipelines", - processors: nil, - expectedProcessors: nil, - }, - { - name: "no processors in chain", - processors: nil, - expectedProcessors: nil, - }, - { - name: "batch processor in singleton chain", - processors: []interface{}{"batch"}, - expectedProcessors: []interface{}{"batch", "decouple"}, - }, - { - name: "batch processor present twice", - processors: []interface{}{"batch", "processor1", "batch"}, - expectedProcessors: []interface{}{"batch", "processor1", "batch", "decouple"}, - }, + { + name: "no service", + processors: nil, + expectedProcessors: nil, + }, + { + name: "no pipelines", + processors: nil, + expectedProcessors: nil, + }, + { + name: "no processors in chain", + processors: nil, + expectedProcessors: nil, + }, + { + name: "batch processor in singleton chain", + processors: []interface{}{"batch"}, + expectedProcessors: []interface{}{"batch", "decouple"}, + }, + { + name: "batch processor present twice", + processors: []interface{}{"batch", "processor1", "batch"}, + expectedProcessors: []interface{}{"batch", "processor1", "batch", "decouple"}, + }, - { - name: "batch processor not present", - processors: []interface{}{"processor1", "processor2"}, - expectedProcessors: []interface{}{"processor1", "processor2"}, - }, - { - name: "batch sandwiched between processors no decouple", - processors: []interface{}{"processor1", "batch", "processor2"}, - expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - }, + { + name: "batch processor not present", + processors: []interface{}{"processor1", "processor2"}, + expectedProcessors: []interface{}{"processor1", "processor2"}, + }, + { + name: "batch sandwiched between processors no decouple", + processors: []interface{}{"processor1", "batch", "processor2"}, + expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + }, - { - name: "batch and decouple processors already present in correct position", - processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - }, - { - name: "decouple and batch", - processors: []interface{}{"decouple", "batch"}, - expectedProcessors: []interface{}{"decouple", "batch", "decouple"}, - }, - { - name: "decouple then batch mixed with others in the pipelinefirst then batch somewhere", - processors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3"}, - expectedProcessors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3", "decouple"}, - }, - } + { + name: "batch and decouple processors already present in correct position", + processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + }, + { + name: "decouple and batch", + processors: []interface{}{"decouple", "batch"}, + expectedProcessors: []interface{}{"decouple", "batch", "decouple"}, + }, + { + name: "decouple then batch mixed with others in the pipelinefirst then batch somewhere", + processors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3"}, + expectedProcessors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3", "decouple"}, + }, + } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - conf := baseConf(tc.processors) - expected := baseConf(tc.expectedProcessors) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + conf := baseConf(tc.processors) + expected := baseConf(tc.expectedProcessors) - c := New() - err := c.Convert(context.Background(), conf) - if err != tc.err { - t.Errorf("unexpected error converting: %v", err) - } - if diff := cmp.Diff(expected.ToStringMap(), conf.ToStringMap()); diff != "" { - t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) - } - }) - } + c := New() + err := c.Convert(context.Background(), conf) + if err != tc.err { + t.Errorf("unexpected error converting: %v", err) + } + if diff := cmp.Diff(expected.ToStringMap(), conf.ToStringMap()); diff != "" { + t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) + } + }) + } } From 9b72b9e1f6a8419debfb3187ab9e8e47c0922cf4 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 14:16:21 -0500 Subject: [PATCH 17/20] restructure tests to extend coverage --- .../converter_test.go | 113 +++++++++++------- 1 file changed, 71 insertions(+), 42 deletions(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index a8a16def1d..26ccd2c277 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -23,14 +23,14 @@ import ( ) func TestConvert(t *testing.T) { - // Since this really tests differences in processors, it's easier to read cases + // Since this really tests differences in input, it's easier to read cases // without the repeated definition of other fields in the config. - baseConf := func(processors []interface{}) *confmap.Conf { + baseConf := func(input []interface{}) *confmap.Conf { return confmap.NewFromStringMap(map[string]interface{}{ "service": map[string]interface{}{ "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "processors": processors, + "processors": input, }, }, }, @@ -38,78 +38,107 @@ func TestConvert(t *testing.T) { } testCases := []struct { - name string - processors []interface{} - expectedProcessors []interface{} - err error + name string + input *confmap.Conf + expected *confmap.Conf + err error }{ // This test is first, because it illustrates the difference in making the rule that when // batch is present the converter appends decouple processor to the end of chain versus // the approach of this code which is to do this only when the last instance of batch // is not followed by decouple processor. { - name: "batch then decouple in middle of chain", - processors: []interface{}{"processor1", "batch", "decouple", "processor2"}, - expectedProcessors: []interface{}{"processor1", "batch", "decouple", "processor2"}, + name: "batch then decouple in middle of chain", + input: baseConf([]interface{}{"processor1", "batch", "decouple", "processor2"}), + expected: baseConf([]interface{}{"processor1", "batch", "decouple", "processor2"}), }, { - name: "no service", - processors: nil, - expectedProcessors: nil, + name: "no service", + input: confmap.New(), + expected: confmap.New(), }, { - name: "no pipelines", - processors: nil, - expectedProcessors: nil, + name: "no pipelines", + input: confmap.NewFromStringMap( + map[string]interface{}{ + "service": map[string]interface{}{ + "extensions": map[string]interface{}{}, + }, + }, + ), + expected: confmap.NewFromStringMap( + map[string]interface{}{ + "service": map[string]interface{}{ + "extensions": map[string]interface{}{}, + }, + }, + ), }, { - name: "no processors in chain", - processors: nil, - expectedProcessors: nil, + name: "no processors in chain", + input: confmap.NewFromStringMap( + map[string]interface{}{ + "service": map[string]interface{}{ + "extensions": map[string]interface{}{}, + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{}, + }, + }, + }, + ), + expected: confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "extensions": map[string]interface{}{}, + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{}, + }, + }, + }, + ), }, { - name: "batch processor in singleton chain", - processors: []interface{}{"batch"}, - expectedProcessors: []interface{}{"batch", "decouple"}, + name: "batch processor in singleton chain", + input: baseConf([]interface{}{"batch"}), + expected: baseConf([]interface{}{"batch", "decouple"}), }, { - name: "batch processor present twice", - processors: []interface{}{"batch", "processor1", "batch"}, - expectedProcessors: []interface{}{"batch", "processor1", "batch", "decouple"}, + name: "batch processor present twice", + input: baseConf([]interface{}{"batch", "processor1", "batch"}), + expected: baseConf([]interface{}{"batch", "processor1", "batch", "decouple"}), }, { - name: "batch processor not present", - processors: []interface{}{"processor1", "processor2"}, - expectedProcessors: []interface{}{"processor1", "processor2"}, + name: "batch processor not present", + input: baseConf([]interface{}{"processor1", "processor2"}), + expected: baseConf([]interface{}{"processor1", "processor2"}), }, { - name: "batch sandwiched between processors no decouple", - processors: []interface{}{"processor1", "batch", "processor2"}, - expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + name: "batch sandwiched between input no decouple", + input: baseConf([]interface{}{"processor1", "batch", "processor2"}), + expected: baseConf([]interface{}{"processor1", "batch", "processor2", "decouple"}), }, { - name: "batch and decouple processors already present in correct position", - processors: []interface{}{"processor1", "batch", "processor2", "decouple"}, - expectedProcessors: []interface{}{"processor1", "batch", "processor2", "decouple"}, + name: "batch and decouple input already present in correct position", + input: baseConf([]interface{}{"processor1", "batch", "processor2", "decouple"}), + expected: baseConf([]interface{}{"processor1", "batch", "processor2", "decouple"}), }, { - name: "decouple and batch", - processors: []interface{}{"decouple", "batch"}, - expectedProcessors: []interface{}{"decouple", "batch", "decouple"}, + name: "decouple and batch", + input: baseConf([]interface{}{"decouple", "batch"}), + expected: baseConf([]interface{}{"decouple", "batch", "decouple"}), }, { - name: "decouple then batch mixed with others in the pipelinefirst then batch somewhere", - processors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3"}, - expectedProcessors: []interface{}{"processor1", "decouple", "processor2", "batch", "processor3", "decouple"}, + name: "decouple then batch mixed with others in the pipelinefirst then batch somewhere", + input: baseConf([]interface{}{"processor1", "decouple", "processor2", "batch", "processor3"}), + expected: baseConf([]interface{}{"processor1", "decouple", "processor2", "batch", "processor3", "decouple"}), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - conf := baseConf(tc.processors) - expected := baseConf(tc.expectedProcessors) + conf := tc.input + expected := tc.expected c := New() err := c.Convert(context.Background(), conf) From d88a4e2984bb45b2ce43146f48272e1d8fdefb8c Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 14:22:21 -0500 Subject: [PATCH 18/20] go mod tidy --- collector/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/go.mod b/collector/go.mod index b8cb755934..9531e2419b 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -20,6 +20,7 @@ replace cloud.google.com/go => cloud.google.com/go v0.107.0 require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider v0.92.0 github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents v0.91.0 github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000 @@ -69,7 +70,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect github.com/hashicorp/go-version v1.6.0 // indirect From d8d5062d80e5b6735c648b18af87a1425b978546 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Wed, 17 Apr 2024 17:48:42 -0500 Subject: [PATCH 19/20] Update collector/internal/confmap/converter/decoupleafterbatchconverter/README.md Co-authored-by: Adam Charrett <73886859+adcharre@users.noreply.github.com> --- .../confmap/converter/decoupleafterbatchconverter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md b/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md index f4a5ccb2a9..cd02d4817f 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/README.md @@ -6,6 +6,6 @@ The `DecoupleAfterBatch` converter automatically modifies the collector's config The converter scans the collector's configuration and makes the following adjustments: -1. If a pipeline contains a batch processor with no decouple processor defined after it, the converter will automatically add a decouple processor to the pipeline immediately after the batch processor. +1. If a pipeline contains a batch processor with no decouple processor defined after it, the converter will automatically add a decouple processor to the end of the pipeline. 2. If a pipeline contains a batch processor with a decouple processor already defined after it or there is no batch processor defined, the converter will not make any changes to the pipeline configuration. From 50388093c93a24abb6de3bfc4aec058c1442e364 Mon Sep 17 00:00:00 2001 From: Nathan Slaughter <28688390+nslaughter@users.noreply.github.com> Date: Mon, 22 Apr 2024 12:26:35 -0500 Subject: [PATCH 20/20] Add auto-config explaination to Collector --- collector/README.md | 66 +++++++++------------------------------------ 1 file changed, 12 insertions(+), 54 deletions(-) diff --git a/collector/README.md b/collector/README.md index 6b503de1b8..51a37b2512 100644 --- a/collector/README.md +++ b/collector/README.md @@ -90,68 +90,26 @@ from an S3 object using a CloudFormation template: Loading configuration from S3 will require that the IAM role attached to your function includes read access to the relevant bucket. +## Auto-Configuration + +Configuring the Lambda Collector without the decouple processor and batch processor can lead to performance issues. So the OpenTelemetry Lambda Layer automatically adds the decouple processor to the end of the chain if the batch processor is used and the decouple processor is not. + # Improving Lambda responses times At the end of a lambda function's execution, the OpenTelemetry client libraries will flush any pending spans/metrics/logs -to the collector before returning control to the Lambda environment. The collector's pipelines are synchronous and this -means that the response of the lambda function is delayed until the data has been exported. +to the collector before returning control to the Lambda environment. The collector's pipelines are synchronous and this +means that the response of the lambda function is delayed until the data has been exported. This delay can potentially be for hundreds of milliseconds. -To overcome this problem the [decouple](./processor/decoupleprocessor/README.md) processor can be used to separate the -two ends of the collectors pipeline and allow the lambda function to complete while ensuring that any data is exported +To overcome this problem the [decouple](./processor/decoupleprocessor/README.md) processor can be used to separate the +two ends of the collectors pipeline and allow the lambda function to complete while ensuring that any data is exported before the Lambda environment is frozen. -Below is a sample configuration that uses the decouple processor: -```yaml -receivers: - otlp: - protocols: - grpc: - -exporters: - logging: - loglevel: debug - otlp: - endpoint: { backend endpoint } - -processors: - decouple: - -service: - pipelines: - traces: - receivers: [otlp] - processors: [decouple] - exporters: [logging, otlp] -``` +See the section regarding auto-configuration above. You don't need to manually add the decouple processor to your configuration. ## Reducing Lambda runtime -If your lambda function is invoked frequently it is also possible to pair the decouple processor with the batch -processor to reduce total lambda execution time at the expense of delaying the export of OpenTelemetry data. +If your lambda function is invoked frequently it is also possible to pair the decouple processor with the batch +processor to reduce total lambda execution time at the expense of delaying the export of OpenTelemetry data. When used with the batch processor the decouple processor must be the last processor in the pipeline to ensure that data is successfully exported before the lambda environment is frozen. -An example use of the batch and decouple processors: -```yaml -receivers: - otlp: - protocols: - grpc: - -exporters: - logging: - loglevel: debug - otlp: - endpoint: { backend endpoint } - -processors: - decouple: - batch: - timeout: 5m - -service: - pipelines: - traces: - receivers: [otlp] - processors: [batch, decouple] - exporters: [logging, otlp] -``` \ No newline at end of file +As stated previously in the auto-configuration section, the OpenTelemetry Lambda Layer will automatically add the decouple processor to the end of the processors if the batch is used and the decouple processor is not. The result will be the same whether you configure it manually or not.