-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporter/loadbalancing] Add top level sending_queue, retry_on_failure and timeout settings #36094
base: main
Are you sure you want to change the base?
Changes from 2 commits
5787b39
e7acefa
089532b
8560979
de6a9c6
65b6d73
3a856cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: loadbalancingexporter | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Adding sending_queue, retry_on_failure and timeout settings to loadbalancing exporter configuration | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [35378,16826] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: OTLP sub-exporter queue will be automatically disabled if loadbalancing exporter queue is enabled to avoid data loss | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,7 +55,7 @@ The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`, | |
|
||
## Configuration | ||
|
||
Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor. | ||
Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the exporter. | ||
|
||
* The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint. | ||
* The `resolver` accepts a `static` node, a `dns`, a `k8s` service or `aws_cloud_map`. If all four are specified, an `errMultipleResolversProvided` error will be thrown. | ||
|
@@ -90,6 +90,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th | |
* `traceID`: Routes spans based on their `traceID`. Invalid for metrics. | ||
* `metric`: Routes metrics based on their metric name. Invalid for spans. | ||
* `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data | ||
* loadbalancing exporter supports set of standard [queuing, batching, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs more documentation: what happens when a failure occurs and then the ring is changed? Will it be directed to a new backend (yes!)? This expected behavior should be explicitly documented to our users. |
||
|
||
Simple example | ||
|
||
|
@@ -117,9 +118,9 @@ exporters: | |
- backend-2:4317 | ||
- backend-3:4317 | ||
- backend-4:4317 | ||
# Notice to config a headless service DNS in Kubernetes | ||
# Notice to config a headless service DNS in Kubernetes | ||
# dns: | ||
# hostname: otelcol-headless.observability.svc.cluster.local | ||
# hostname: otelcol-headless.observability.svc.cluster.local | ||
|
||
service: | ||
pipelines: | ||
|
@@ -137,6 +138,75 @@ service: | |
- loadbalancing | ||
``` | ||
|
||
Persistent queue, retry and timeout usage example: | ||
|
||
```yaml | ||
receivers: | ||
otlp: | ||
protocols: | ||
grpc: | ||
endpoint: localhost:4317 | ||
|
||
processors: | ||
|
||
exporters: | ||
loadbalancing: | ||
timeout: 10s | ||
retry_on_failure: | ||
enabled: true | ||
initial_interval: 5s | ||
max_interval: 30s | ||
max_elapsed_time: 300s | ||
sending_queue: | ||
# please take a note that otlp.sending_queue will be | ||
# disabled automatically in this case to avoid data loss | ||
enabled: true | ||
num_consumers: 2 | ||
queue_size: 1000 | ||
storage: file_storage/otc | ||
routing_key: "service" | ||
protocol: | ||
otlp: | ||
# all options from the OTLP exporter are supported | ||
# except the endpoint | ||
timeout: 1s | ||
# doesn't take any effect because loadbalancing.sending_queue | ||
# is enabled | ||
sending_queue: | ||
enabled: true | ||
resolver: | ||
static: | ||
hostnames: | ||
- backend-1:4317 | ||
- backend-2:4317 | ||
- backend-3:4317 | ||
- backend-4:4317 | ||
# Notice to config a headless service DNS in Kubernetes | ||
# dns: | ||
# hostname: otelcol-headless.observability.svc.cluster.local | ||
|
||
extensions: | ||
file_storage/otc: | ||
directory: /var/lib/storage/otc | ||
timeout: 10s | ||
|
||
service: | ||
extensions: [file_storage] | ||
pipelines: | ||
traces: | ||
receivers: | ||
- otlp | ||
processors: [] | ||
exporters: | ||
- loadbalancing | ||
logs: | ||
receivers: | ||
- otlp | ||
processors: [] | ||
exporters: | ||
- loadbalancing | ||
``` | ||
|
||
Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md)) | ||
|
||
```yaml | ||
|
@@ -334,7 +404,7 @@ service: | |
|
||
## Metrics | ||
|
||
The following metrics are recorded by this processor: | ||
The following metrics are recorded by this exporter: | ||
|
||
* `otelcol_loadbalancer_num_resolutions` represents the total number of resolutions performed by the resolver specified in the tag `resolver`, split by their outcome (`success=true|false`). For the static resolver, this should always be `1` with the tag `success=true`. | ||
* `otelcol_loadbalancer_num_backends` informs how many backends are currently in use. It should always match the number of items specified in the configuration file in case the `static` resolver is used, and should eventually (seconds) catch up with the DNS changes. Note that DNS caches that might exist between the load balancer and the record authority will influence how long it takes for the load balancer to see the change. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,14 +7,22 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config/configretry" | ||
"go.opentelemetry.io/collector/exporter" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
"go.opentelemetry.io/collector/exporter/otlpexporter" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" | ||
) | ||
|
||
const ( | ||
zapEndpointKey = "endpoint" | ||
) | ||
|
||
// NewFactory creates a factory for the exporter. | ||
func NewFactory() exporter.Factory { | ||
return exporter.NewFactory( | ||
|
@@ -32,20 +40,98 @@ func createDefaultConfig() component.Config { | |
otlpDefaultCfg.Endpoint = "placeholder:4317" | ||
|
||
return &Config{ | ||
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This behavior should be clearly documented. I think it would even make sense to log a warning in case users tried to use that option. That said, there ARE users relying on this feature at the moment. What should we do about them? I think we should copy their current values to the load-balancer level, which would give them roughly the same desired behavior they have today. So, here's how it could work:
I don't think we need a feature flag or deprecation plan for this. |
||
QueueSettings: exporterhelper.NewDefaultQueueConfig(), | ||
BackOffConfig: configretry.NewDefaultBackOffConfig(), | ||
Protocol: Protocol{ | ||
OTLP: *otlpDefaultCfg, | ||
}, | ||
} | ||
} | ||
|
||
func createTracesExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { | ||
return newTracesExporter(params, cfg) | ||
func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { | ||
oCfg := cfg.Protocol.OTLP | ||
oCfg.Endpoint = endpoint | ||
// If top level queue is enabled - per-endpoint queue must be disable | ||
// This helps us to avoid unexpected issues with mixing 2 level of exporter queues | ||
if cfg.QueueSettings.Enabled { | ||
oCfg.QueueConfig.Enabled = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the for exp, td := range exporterSegregatedTraces {
start := time.Now()
err := exp.ConsumeTraces(ctx, td)
exp.consumeWG.Done()
errs = multierr.Append(errs, err)
...
} Now that the What might be necessary is an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switching to an export queue also means that includes the queue workers which allow parallel processing, which allows a layer of latency hiding. Including a new "queue" worker in this could be hazardous in how it is done, say if you create a routine for each split batch you could end up with high amount of routine scheduling which can cause a significant performance hit. Then if you do more an async send on channels you have the original risk that this solves due to the queue moving from the current queue structure, to now a buffered channel that now needs to be shared again. For this iteration, I would suggest leaving it out of this change and keep as an issue in the backlog of "investigate and address if required" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missed that part ( |
||
} | ||
return oCfg | ||
} | ||
|
||
func buildExporterSettings(params exporter.Settings, endpoint string) exporter.Settings { | ||
// Override child exporter ID to segregate metrics from loadbalancing top level | ||
childName := endpoint | ||
if params.ID.Name() != "" { | ||
childName = fmt.Sprintf("%s_%s", params.ID.Name(), childName) | ||
} | ||
params.ID = component.NewIDWithName(params.ID.Type(), childName) | ||
// Add "endpoint" attribute to child exporter logger to segregate logs from loadbalancing top level | ||
params.Logger = params.Logger.With(zap.String(zapEndpointKey, endpoint)) | ||
|
||
return params | ||
} | ||
|
||
func createTracesExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { | ||
c := cfg.(*Config) | ||
exporter, err := newTracesExporter(params, cfg) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot configure loadbalancing traces exporter: %w", err) | ||
} | ||
|
||
return exporterhelper.NewTraces( | ||
ctx, | ||
params, | ||
cfg, | ||
exporter.ConsumeTraces, | ||
exporterhelper.WithStart(exporter.Start), | ||
exporterhelper.WithShutdown(exporter.Shutdown), | ||
exporterhelper.WithCapabilities(exporter.Capabilities()), | ||
exporterhelper.WithTimeout(c.TimeoutSettings), | ||
exporterhelper.WithQueue(c.QueueSettings), | ||
exporterhelper.WithRetry(c.BackOffConfig), | ||
) | ||
} | ||
|
||
func createLogsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { | ||
c := cfg.(*Config) | ||
exporter, err := newLogsExporter(params, cfg) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err) | ||
} | ||
|
||
return exporterhelper.NewLogs( | ||
ctx, | ||
params, | ||
cfg, | ||
exporter.ConsumeLogs, | ||
exporterhelper.WithStart(exporter.Start), | ||
exporterhelper.WithShutdown(exporter.Shutdown), | ||
exporterhelper.WithCapabilities(exporter.Capabilities()), | ||
exporterhelper.WithTimeout(c.TimeoutSettings), | ||
exporterhelper.WithQueue(c.QueueSettings), | ||
exporterhelper.WithRetry(c.BackOffConfig), | ||
) | ||
} | ||
|
||
func createLogsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { | ||
return newLogsExporter(params, cfg) | ||
func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) { | ||
c := cfg.(*Config) | ||
exporter, err := newMetricsExporter(params, cfg) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err) | ||
} | ||
|
||
func createMetricsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) { | ||
return newMetricsExporter(params, cfg) | ||
return exporterhelper.NewMetrics( | ||
ctx, | ||
params, | ||
cfg, | ||
exporter.ConsumeMetrics, | ||
exporterhelper.WithStart(exporter.Start), | ||
exporterhelper.WithShutdown(exporter.Shutdown), | ||
exporterhelper.WithCapabilities(exporter.Capabilities()), | ||
exporterhelper.WithTimeout(c.TimeoutSettings), | ||
exporterhelper.WithQueue(c.QueueSettings), | ||
exporterhelper.WithRetry(c.BackOffConfig), | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you've consolidated all exporter queues into one, their overall size is smaller. This could lead to dropped data in some deployments, since you are going from
n*queueSize
to1*queueSize
- I don't think this is necessarily a breaking change but I think it should be mentioned in the changelog, so users can increase the queue size if they run into issues.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated changelog to reflect this part