From 5787b3951498b040e2cf99888c98fd02a7a3c7bd Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Wed, 30 Oct 2024 17:45:37 +0200 Subject: [PATCH 1/6] [exporter/loadbalancing] Add top level sending_queue, retry_on_failure and timeout settings --- .../feat_loadbalancing-exporter-queue.yaml | 27 +++++ exporter/loadbalancingexporter/README.md | 78 +++++++++++++- exporter/loadbalancingexporter/config.go | 6 ++ exporter/loadbalancingexporter/factory.go | 98 +++++++++++++++-- .../loadbalancingexporter/factory_test.go | 100 ++++++++++++++++++ .../loadbalancingexporter/log_exporter.go | 4 +- .../loadbalancingexporter/metrics_exporter.go | 4 +- .../metrics_exporter_test.go | 32 ------ .../testdata/config.yaml | 11 +- .../loadbalancingexporter/trace_exporter.go | 10 +- .../trace_exporter_test.go | 34 ------ 11 files changed, 318 insertions(+), 86 deletions(-) create mode 100644 .chloggen/feat_loadbalancing-exporter-queue.yaml diff --git a/.chloggen/feat_loadbalancing-exporter-queue.yaml b/.chloggen/feat_loadbalancing-exporter-queue.yaml new file mode 100644 index 000000000000..829492bf25d7 --- /dev/null +++ b/.chloggen/feat_loadbalancing-exporter-queue.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adding sending_queue, retry_on_failure and timeout settings to loadbalancing exporter configuration + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35378,16826] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: OTLP sub-exporter queue will be automatically disabled if loadbalancing exporter queue is enabled to avoid data loss + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md index b50485b742d4..f847cd18532d 100644 --- a/exporter/loadbalancingexporter/README.md +++ b/exporter/loadbalancingexporter/README.md @@ -55,7 +55,7 @@ The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`, ## Configuration -Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor. +Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the exporter. * The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint. * The `resolver` accepts a `static` node, a `dns`, a `k8s` service or `aws_cloud_map`. If all four are specified, an `errMultipleResolversProvided` error will be thrown. @@ -90,6 +90,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th * `traceID`: Routes spans based on their `traceID`. Invalid for metrics. * `metric`: Routes metrics based on their metric name. Invalid for spans. * `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data +* loadbalancing exporter supports set of standard [queuing, batching, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) Simple example @@ -117,9 +118,9 @@ exporters: - backend-2:4317 - backend-3:4317 - backend-4:4317 - # Notice to config a headless service DNS in Kubernetes + # Notice to config a headless service DNS in Kubernetes # dns: - # hostname: otelcol-headless.observability.svc.cluster.local + # hostname: otelcol-headless.observability.svc.cluster.local service: pipelines: @@ -137,6 +138,75 @@ service: - loadbalancing ``` +Persistent queue, retry and timeout usage example: + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: localhost:4317 + +processors: + +exporters: + loadbalancing: + timeout: 10s + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + sending_queue: + # please take a note that otlp.sending_queue will be + # disabled automatically in this case to avoid data loss + enabled: true + num_consumers: 2 + queue_size: 1000 + storage: file_storage/otc + routing_key: "service" + protocol: + otlp: + # all options from the OTLP exporter are supported + # except the endpoint + timeout: 1s + # doesn't take any effect because loadbalancing.sending_queue + # is enabled + sending_queue: + enabled: true + resolver: + static: + hostnames: + - backend-1:4317 + - backend-2:4317 + - backend-3:4317 + - backend-4:4317 + # Notice to config a headless service DNS in Kubernetes + # dns: + # hostname: otelcol-headless.observability.svc.cluster.local + +extensions: + file_storage/otc: + directory: /var/lib/storage/otc + timeout: 10s + +service: + extensions: [file_storage] + pipelines: + traces: + receivers: + - otlp + processors: [] + exporters: + - loadbalancing + logs: + receivers: + - otlp + processors: [] + exporters: + - loadbalancing +``` + Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md)) ```yaml @@ -334,7 +404,7 @@ service: ## Metrics -The following metrics are recorded by this processor: +The following metrics are recorded by this exporter: * `otelcol_loadbalancer_num_resolutions` represents the total number of resolutions performed by the resolver specified in the tag `resolver`, split by their outcome (`success=true|false`). For the static resolver, this should always be `1` with the tag `success=true`. * `otelcol_loadbalancer_num_backends` informs how many backends are currently in use. It should always match the number of items specified in the configuration file in case the `static` resolver is used, and should eventually (seconds) catch up with the DNS changes. Note that DNS caches that might exist between the load balancer and the record authority will influence how long it takes for the load balancer to see the change. diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index 8496268de7ed..b9682df16892 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -7,6 +7,8 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" ) @@ -30,6 +32,10 @@ const ( // Config defines configuration for the exporter. type Config struct { + TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` + configretry.BackOffConfig `mapstructure:"retry_on_failure"` + QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` + Protocol Protocol `mapstructure:"protocol"` Resolver ResolverSettings `mapstructure:"resolver"` RoutingKey string `mapstructure:"routing_key"` diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index f1c37e151757..2dcea51a6af6 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -7,14 +7,22 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry import ( "context" + "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) +const ( + zapEndpointKey = "endpoint" +) + // NewFactory creates a factory for the exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( @@ -32,20 +40,98 @@ func createDefaultConfig() component.Config { otlpDefaultCfg.Endpoint = "placeholder:4317" return &Config{ + TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(), + QueueSettings: exporterhelper.NewDefaultQueueConfig(), + BackOffConfig: configretry.NewDefaultBackOffConfig(), Protocol: Protocol{ OTLP: *otlpDefaultCfg, }, } } -func createTracesExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { - return newTracesExporter(params, cfg) +func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { + oCfg := cfg.Protocol.OTLP + oCfg.Endpoint = endpoint + // If top level queue is enabled - per-endpoint queue must be disable + // This helps us to avoid unexpected issues with mixing 2 level of exporter queues + if cfg.QueueSettings.Enabled { + oCfg.QueueConfig.Enabled = false + } + return oCfg +} + +func buildExporterSettings(params exporter.Settings, endpoint string) exporter.Settings { + // Override child exporter ID to segregate metrics from loadbalancing top level + childName := endpoint + if params.ID.Name() != "" { + childName = fmt.Sprintf("%s_%s", params.ID.Name(), childName) + } + params.ID = component.NewIDWithName(params.ID.Type(), childName) + // Add "endpoint" attribute to child exporter logger to segregate logs from loadbalancing top level + params.Logger = params.Logger.With(zap.String(zapEndpointKey, endpoint)) + + return params +} + +func createTracesExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { + c := cfg.(*Config) + exporter, err := newTracesExporter(params, cfg) + if err != nil { + return nil, fmt.Errorf("cannot configure loadbalancing traces exporter: %w", err) + } + + return exporterhelper.NewTraces( + ctx, + params, + cfg, + exporter.ConsumeTraces, + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithCapabilities(exporter.Capabilities()), + exporterhelper.WithTimeout(c.TimeoutSettings), + exporterhelper.WithQueue(c.QueueSettings), + exporterhelper.WithRetry(c.BackOffConfig), + ) + } + +func createLogsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { + c := cfg.(*Config) + exporter, err := newLogsExporter(params, cfg) + if err != nil { + return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err) +} + + return exporterhelper.NewLogs( + ctx, + params, + cfg, + exporter.ConsumeLogs, + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithCapabilities(exporter.Capabilities()), + exporterhelper.WithTimeout(c.TimeoutSettings), + exporterhelper.WithQueue(c.QueueSettings), + exporterhelper.WithRetry(c.BackOffConfig), + ) } -func createLogsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { - return newLogsExporter(params, cfg) +func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) { + c := cfg.(*Config) + exporter, err := newMetricsExporter(params, cfg) + if err != nil { + return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err) } -func createMetricsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) { - return newMetricsExporter(params, cfg) + return exporterhelper.NewMetrics( + ctx, + params, + cfg, + exporter.ConsumeMetrics, + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithCapabilities(exporter.Capabilities()), + exporterhelper.WithTimeout(c.TimeoutSettings), + exporterhelper.WithQueue(c.QueueSettings), + exporterhelper.WithRetry(c.BackOffConfig), + ) } diff --git a/exporter/loadbalancingexporter/factory_test.go b/exporter/loadbalancingexporter/factory_test.go index 974b13d04bdb..744b58a5d6df 100644 --- a/exporter/loadbalancingexporter/factory_test.go +++ b/exporter/loadbalancingexporter/factory_test.go @@ -5,10 +5,20 @@ package loadbalancingexporter import ( "context" + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/otelcol/otelcoltest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) func TestTracesExporterGetsCreatedWithValidConfiguration(t *testing.T) { @@ -58,3 +68,93 @@ func TestOTLPConfigIsValid(t *testing.T) { // verify assert.NoError(t, otlpCfg.Validate()) } + +func TestBuildExporterConfig(t *testing.T) { + // prepare + factories, err := otelcoltest.NopFactories() + require.NoError(t, err) + + factories.Exporters[metadata.Type] = NewFactory() + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 + // nolint:staticcheck + cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + c := cfg.Exporters[component.NewID(metadata.Type)] + require.NotNil(t, c) + + // test + defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) + exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") + + // verify + grpcSettings := defaultCfg.ClientConfig + grpcSettings.Endpoint = "the-endpoint" + if c.(*Config).QueueSettings.Enabled { + defaultCfg.QueueConfig.Enabled = false + } + assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) + + assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) + assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) + assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) +} + +func TestBuildExporterConfigUnknown(t *testing.T) { + // prepare + factories, err := otelcoltest.NopFactories() + require.NoError(t, err) + + factories.Exporters[metadata.Type] = NewFactory() + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 + // nolint:staticcheck + cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + c := cfg.Exporters[component.NewID(metadata.Type)] + require.NotNil(t, c) + + // test + defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) + exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") + + // verify + grpcSettings := defaultCfg.ClientConfig + grpcSettings.Endpoint = "the-endpoint" + if c.(*Config).QueueSettings.Enabled { + defaultCfg.QueueConfig.Enabled = false + } + assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) + + assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) + assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) + assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) +} + +func TestBuildExporterSettings(t *testing.T) { + // prepare + creationParams := exportertest.NewNopSettings() + testEndpoint := "the-endpoint" + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + creationParams.Logger = zap.New(observedZapCore) + + // test + exporterParams := buildExporterSettings(creationParams, testEndpoint) + exporterParams.Logger.Info("test") + + // verify + expectedID := component.NewIDWithName( + creationParams.ID.Type(), + fmt.Sprintf("%s_%s", creationParams.ID.Name(), testEndpoint), + ) + assert.Equal(t, expectedID, exporterParams.ID) + + allLogs := observedLogs.All() + require.Equal(t, 1, observedLogs.Len()) + assert.Contains(t, + allLogs[0].Context, + zap.String(zapEndpointKey, testEndpoint), + ) +} diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index 8d4e3cf56b37..2d1385b76e34 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -41,7 +41,9 @@ func newLogsExporter(params exporter.Settings, cfg component.Config) (*logExport exporterFactory := otlpexporter.NewFactory() cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) - return exporterFactory.CreateLogs(ctx, params, &oCfg) + oParams := buildExporterSettings(params, endpoint) + + return exporterFactory.CreateLogs(ctx, oParams, &oCfg) } lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index 9fd765a5416d..aa15bd74f504 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -43,7 +43,9 @@ func newMetricsExporter(params exporter.Settings, cfg component.Config) (*metric exporterFactory := otlpexporter.NewFactory() cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) - return exporterFactory.CreateMetrics(ctx, params, &oCfg) + oParams := buildExporterSettings(params, endpoint) + + return exporterFactory.CreateMetrics(ctx, oParams, &oCfg) } lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index 9e4ab653dfb4..a59cba273478 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -24,14 +24,11 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/otlpexporter" - "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" "gopkg.in/yaml.v2" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) @@ -673,35 +670,6 @@ func TestConsumeMetricsUnexpectedExporterType(t *testing.T) { assert.EqualError(t, res, fmt.Sprintf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", newNopMockExporter())) } -func TestBuildExporterConfigUnknown(t *testing.T) { - // prepare - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) - - factories.Exporters[metadata.Type] = NewFactory() - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 - // nolint:staticcheck - cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) - require.NoError(t, err) - require.NotNil(t, cfg) - - c := cfg.Exporters[component.NewID(metadata.Type)] - require.NotNil(t, c) - - // test - defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) - exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") - - // verify - grpcSettings := defaultCfg.ClientConfig - grpcSettings.Endpoint = "the-endpoint" - assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) - - assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) - assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) - assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) -} - func TestBatchWithTwoMetrics(t *testing.T) { ts, tb := getTelemetryAssets(t) sink := new(consumertest.MetricsSink) diff --git a/exporter/loadbalancingexporter/testdata/config.yaml b/exporter/loadbalancingexporter/testdata/config.yaml index da1d51818e59..64a0271338b3 100644 --- a/exporter/loadbalancingexporter/testdata/config.yaml +++ b/exporter/loadbalancingexporter/testdata/config.yaml @@ -1,6 +1,6 @@ loadbalancing: protocol: - # the OTLP exporter configuration. "endpoint" values will be ignored + # the OTLP exporter configuration "endpoint" values will be ignored otlp: timeout: 1s @@ -38,3 +38,12 @@ loadbalancing/4: namespace: cloudmap-1 service_name: service-1 port: 4319 + +loadbalancing/5: + # the OTLP exporter configuration "sending_queue" values will be ignored + sending_queue: + enabled: true + protocol: + otlp: + sending_queue: + enabled: false diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 3e335e8a9e14..e6fb9647d977 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -45,7 +45,9 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx exporterFactory := otlpexporter.NewFactory() cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) - return exporterFactory.CreateTraces(ctx, params, &oCfg) + oParams := buildExporterSettings(params, endpoint) + + return exporterFactory.CreateTraces(ctx, oParams, &oCfg) } lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) @@ -69,12 +71,6 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx return &traceExporter, nil } -func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { - oCfg := cfg.Protocol.OTLP - oCfg.Endpoint = endpoint - return oCfg -} - func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index bb6544e3222e..2e7239cd6a0a 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -9,7 +9,6 @@ import ( "fmt" "math/rand" "net" - "path/filepath" "sync" "sync/atomic" "testing" @@ -23,13 +22,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/otlpexporter" - "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" - - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) func TestNewTracesExporter(t *testing.T) { @@ -349,35 +344,6 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { assert.EqualError(t, res, fmt.Sprintf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", newNopMockExporter())) } -func TestBuildExporterConfig(t *testing.T) { - // prepare - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) - - factories.Exporters[metadata.Type] = NewFactory() - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 - // nolint:staticcheck - cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) - require.NoError(t, err) - require.NotNil(t, cfg) - - c := cfg.Exporters[component.NewID(metadata.Type)] - require.NotNil(t, c) - - // test - defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) - exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") - - // verify - grpcSettings := defaultCfg.ClientConfig - grpcSettings.Endpoint = "the-endpoint" - assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) - - assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) - assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) - assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) -} - func TestBatchWithTwoTraces(t *testing.T) { ts, tb := getTelemetryAssets(t) sink := new(consumertest.TracesSink) From e7acefa016009ff2ade15ce5a11aad94afc1c70c Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Thu, 31 Oct 2024 13:15:25 +0200 Subject: [PATCH 2/6] Remove duplicate test --- .../loadbalancingexporter/factory_test.go | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/exporter/loadbalancingexporter/factory_test.go b/exporter/loadbalancingexporter/factory_test.go index 744b58a5d6df..d27a22874e24 100644 --- a/exporter/loadbalancingexporter/factory_test.go +++ b/exporter/loadbalancingexporter/factory_test.go @@ -101,38 +101,6 @@ func TestBuildExporterConfig(t *testing.T) { assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) } -func TestBuildExporterConfigUnknown(t *testing.T) { - // prepare - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) - - factories.Exporters[metadata.Type] = NewFactory() - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 - // nolint:staticcheck - cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) - require.NoError(t, err) - require.NotNil(t, cfg) - - c := cfg.Exporters[component.NewID(metadata.Type)] - require.NotNil(t, c) - - // test - defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) - exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") - - // verify - grpcSettings := defaultCfg.ClientConfig - grpcSettings.Endpoint = "the-endpoint" - if c.(*Config).QueueSettings.Enabled { - defaultCfg.QueueConfig.Enabled = false - } - assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) - - assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) - assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) - assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) -} - func TestBuildExporterSettings(t *testing.T) { // prepare creationParams := exportertest.NewNopSettings() From 8560979082ce98414260d9dc0f1fb967c5593749 Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Mon, 4 Nov 2024 12:47:41 +0200 Subject: [PATCH 3/6] Apply make fmt --- exporter/loadbalancingexporter/factory.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index 2dcea51a6af6..32dd08123094 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -92,14 +92,14 @@ func createTracesExporter(ctx context.Context, params exporter.Settings, cfg com exporterhelper.WithQueue(c.QueueSettings), exporterhelper.WithRetry(c.BackOffConfig), ) - } +} func createLogsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { c := cfg.(*Config) exporter, err := newLogsExporter(params, cfg) if err != nil { return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err) -} + } return exporterhelper.NewLogs( ctx, @@ -120,7 +120,7 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg co exporter, err := newMetricsExporter(params, cfg) if err != nil { return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err) -} + } return exporterhelper.NewMetrics( ctx, From de6a9c610a91570578f31602d0206d3926001895 Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Mon, 4 Nov 2024 12:49:05 +0200 Subject: [PATCH 4/6] Remove sub-exporter queue disabling As it may produce blocked exporter --- exporter/loadbalancingexporter/factory.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index 32dd08123094..3f4dc357413c 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -52,11 +52,7 @@ func createDefaultConfig() component.Config { func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { oCfg := cfg.Protocol.OTLP oCfg.Endpoint = endpoint - // If top level queue is enabled - per-endpoint queue must be disable - // This helps us to avoid unexpected issues with mixing 2 level of exporter queues - if cfg.QueueSettings.Enabled { - oCfg.QueueConfig.Enabled = false - } + return oCfg } From 65b6d7392acdd3416337c97135b40cf319ea8436 Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Mon, 4 Nov 2024 13:00:45 +0200 Subject: [PATCH 5/6] Update changelog item with queue size recommendation --- .chloggen/feat_loadbalancing-exporter-queue.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.chloggen/feat_loadbalancing-exporter-queue.yaml b/.chloggen/feat_loadbalancing-exporter-queue.yaml index 829492bf25d7..d65a0e1d8d32 100644 --- a/.chloggen/feat_loadbalancing-exporter-queue.yaml +++ b/.chloggen/feat_loadbalancing-exporter-queue.yaml @@ -15,7 +15,9 @@ issues: [35378,16826] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: OTLP sub-exporter queue will be automatically disabled if loadbalancing exporter queue is enabled to avoid data loss +subtext: | + When switching to top-level sending_queue configuration - users should carefully review queue size + In some rare cases setting top-level queue size to n*queueSize might be not enough to prevent data loss # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. From 3a856cbcf70502b58e63a17c5760d4b8d6344a4a Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Mon, 4 Nov 2024 15:10:24 +0200 Subject: [PATCH 6/6] Fix failing test Fix go.mod --- exporter/loadbalancingexporter/factory_test.go | 3 --- exporter/loadbalancingexporter/go.mod | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/exporter/loadbalancingexporter/factory_test.go b/exporter/loadbalancingexporter/factory_test.go index d27a22874e24..dac18b981362 100644 --- a/exporter/loadbalancingexporter/factory_test.go +++ b/exporter/loadbalancingexporter/factory_test.go @@ -91,9 +91,6 @@ func TestBuildExporterConfig(t *testing.T) { // verify grpcSettings := defaultCfg.ClientConfig grpcSettings.Endpoint = "the-endpoint" - if c.(*Config).QueueSettings.Enabled { - defaultCfg.QueueConfig.Enabled = false - } assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 819e820d82b4..765737c0644e 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -13,6 +13,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.112.0 + go.opentelemetry.io/collector/config/configretry v1.18.0 go.opentelemetry.io/collector/config/configtelemetry v0.112.0 go.opentelemetry.io/collector/confmap v1.18.0 go.opentelemetry.io/collector/consumer v0.112.0 @@ -113,7 +114,6 @@ require ( go.opentelemetry.io/collector/config/configgrpc v0.112.0 // indirect go.opentelemetry.io/collector/config/confignet v1.18.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.18.0 // indirect - go.opentelemetry.io/collector/config/configretry v1.18.0 // indirect go.opentelemetry.io/collector/config/configtls v1.18.0 // indirect go.opentelemetry.io/collector/config/internal v0.112.0 // indirect go.opentelemetry.io/collector/confmap/provider/envprovider v1.18.0 // indirect