From 9d9d3c4f34c628f0739cde37767ab28194d581ce Mon Sep 17 00:00:00 2001 From: Glenn Oppegard Date: Mon, 19 Jul 2021 15:20:39 -0600 Subject: [PATCH] [exporter/tanzuobservability] Support queued retries - Flush data once at end of pushTraceData/ConsumeTracesFunc instead of per-span Signed-off-by: Peter Stone --- exporter/tanzuobservabilityexporter/README.md | 50 +++++++++++++++++-- exporter/tanzuobservabilityexporter/config.go | 5 +- .../tanzuobservabilityexporter/exporter.go | 10 ++-- .../tanzuobservabilityexporter/factory.go | 6 +++ .../factory_test.go | 13 +++++ .../testdata/config.yaml | 9 ++++ 6 files changed, 83 insertions(+), 10 deletions(-) diff --git a/exporter/tanzuobservabilityexporter/README.md b/exporter/tanzuobservabilityexporter/README.md index e72a3d97fda2..5f33da606153 100644 --- a/exporter/tanzuobservabilityexporter/README.md +++ b/exporter/tanzuobservabilityexporter/README.md @@ -22,7 +22,48 @@ This exporter supports sending traces to [Tanzu Observability](https://tanzu.vmw - `application` is set to "defaultApp". - `service` is set to "defaultService". -## Example Configuration +## Configuration + +The only required configuration is a Wavefront proxy API endpoint to receive traces from the Tanzu Observability Exporter. + +Given a Wavefront proxy at `10.10.10.10`, configured with `customTracingListenerPorts` set to `30001`, the traces endpoint would be `http://10.10.10.10:30001`. + +### Example Configuration + +```yaml +receivers: + examplereceiver: + +processors: + batch: + timeout: 10s + +exporters: + tanzuobservability: + traces: + endpoint: "http://10.10.10.10:30001" + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [batch] + exporters: [tanzuobservability] +``` + +### Advanced Configuration + +This exporter uses [queuing and retry helpers](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) provided by the core OpenTelemetry Collector. The `retry_on_failure` and `sending_queue` features are enabled by default, but can be disabled using the options below. + +* `retry_on_failure` [Details and defaults here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md#configuration). Enabled by default. + * `enabled` + * `initial_interval` + * `max_interval` + * `max_elapsed_time` +* `sending_queue` [Details and defaults here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md#configuration). Enabled by default. + * `enabled` + * `num_consumers` + * `queue_size` ```yaml receivers: @@ -35,8 +76,11 @@ processors: exporters: tanzuobservability: traces: - # Hostname and `customTracingListenerPorts` of the Wavefront Proxy - endpoint: "http://localhost:30001" + endpoint: "http://10.10.10.10:30001" + retry_on_failure: + max_elapsed_time: 3m + sending_queue: + queue_size: 10000 service: pipelines: diff --git a/exporter/tanzuobservabilityexporter/config.go b/exporter/tanzuobservabilityexporter/config.go index c3bbb45bd5b8..18a28cf58de0 100644 --- a/exporter/tanzuobservabilityexporter/config.go +++ b/exporter/tanzuobservabilityexporter/config.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) type TracesConfig struct { @@ -28,7 +29,9 @@ type TracesConfig struct { // Config defines configuration options for the exporter. type Config struct { - config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` // Traces defines the Traces exporter specific configuration Traces TracesConfig `mapstructure:"traces"` diff --git a/exporter/tanzuobservabilityexporter/exporter.go b/exporter/tanzuobservabilityexporter/exporter.go index 7efd3c507d23..2d31c2073f89 100644 --- a/exporter/tanzuobservabilityexporter/exporter.go +++ b/exporter/tanzuobservabilityexporter/exporter.go @@ -122,6 +122,9 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td pdata.Traces) err } } + if err := e.sender.Flush(); err != nil { + errs = append(errs, err) + } return consumererror.Combine(errs) } @@ -131,7 +134,7 @@ func (e *tracesExporter) recordSpan(span span) error { parents = []string{span.ParentSpanID.String()} } - err := e.sender.SendSpan( + return e.sender.SendSpan( span.Name, span.StartMillis, span.DurationMillis, @@ -143,11 +146,6 @@ func (e *tracesExporter) recordSpan(span span) error { mapToSpanTags(span.Tags), span.SpanLogs, ) - - if err != nil { - return err - } - return e.sender.Flush() } func (e *tracesExporter) shutdown(_ context.Context) error { diff --git a/exporter/tanzuobservabilityexporter/factory.go b/exporter/tanzuobservabilityexporter/factory.go index d6e0f08ca15f..51ffb28f77ba 100644 --- a/exporter/tanzuobservabilityexporter/factory.go +++ b/exporter/tanzuobservabilityexporter/factory.go @@ -40,6 +40,8 @@ func createDefaultConfig() config.Exporter { } return &Config{ ExporterSettings: config.NewExporterSettings(config.NewID(exporterType)), + QueueSettings: exporterhelper.DefaultQueueSettings(), + RetrySettings: exporterhelper.DefaultRetrySettings(), Traces: tracesCfg, } } @@ -56,10 +58,14 @@ func createTracesExporter( return nil, err } + tobsCfg := cfg.(*Config) + return exporterhelper.NewTracesExporter( cfg, set, exp.pushTraceData, + exporterhelper.WithQueue(tobsCfg.QueueSettings), + exporterhelper.WithRetry(tobsCfg.RetrySettings), exporterhelper.WithShutdown(exp.shutdown), ) } diff --git a/exporter/tanzuobservabilityexporter/factory_test.go b/exporter/tanzuobservabilityexporter/factory_test.go index ccdd53dc10f3..83d7c0601cf9 100644 --- a/exporter/tanzuobservabilityexporter/factory_test.go +++ b/exporter/tanzuobservabilityexporter/factory_test.go @@ -18,6 +18,7 @@ import ( "context" "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,6 +27,7 @@ import ( "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) func TestCreateDefaultConfig(t *testing.T) { @@ -56,6 +58,17 @@ func TestLoadConfig(t *testing.T) { Traces: TracesConfig{ HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:40001"}, }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + MaxInterval: 60 * time.Second, + MaxElapsedTime: 10 * time.Minute, + }, } assert.Equal(t, expected, actual) } diff --git a/exporter/tanzuobservabilityexporter/testdata/config.yaml b/exporter/tanzuobservabilityexporter/testdata/config.yaml index 749fbccea016..ffa175e71ef4 100644 --- a/exporter/tanzuobservabilityexporter/testdata/config.yaml +++ b/exporter/tanzuobservabilityexporter/testdata/config.yaml @@ -8,6 +8,15 @@ exporters: tanzuobservability: traces: endpoint: "http://localhost:40001" + retry_on_failure: + enabled: true + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 service: pipelines: