From f3b21eba998a07b644af4152ac28fdf6e4f15609 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Sep 2022 16:56:04 -0700 Subject: [PATCH] Proof-of-Concept for partial export err returns --- .../otlp/otlptrace/otlptracegrpc/client.go | 16 +- .../otlptrace/otlptracegrpc/client_test.go | 26 +-- sdk/trace/flow_span_processor.go | 171 ++++++++++++++++++ sdk/trace/span_exporter.go | 16 +- 4 files changed, 203 insertions(+), 26 deletions(-) create mode 100644 sdk/trace/flow_span_processor.go diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client.go b/exporters/otlp/otlptrace/otlptracegrpc/client.go index 4a139fc696e..efe9683c65b 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client.go @@ -26,11 +26,10 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" + "go.opentelemetry.io/otel/sdk/trace" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) @@ -201,15 +200,14 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) - if resp != nil && resp.PartialSuccess != nil { - otel.Handle(otlp.PartialSuccessToError( - otlp.TracingPartialSuccess, - resp.PartialSuccess.RejectedSpans, - resp.PartialSuccess.ErrorMessage, - )) - } // nil is converted to OK. if status.Code(err) == codes.OK { + if resp != nil && resp.PartialSuccess != nil { + return &trace.PartialExportError{ + RejectedN: resp.PartialSuccess.RejectedSpans, + Err: errors.New(resp.PartialSuccess.ErrorMessage), + } + } // Success. return nil } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go index d11111ed126..952db2b1c96 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go @@ -30,11 +30,11 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" @@ -390,24 +390,18 @@ func TestEmptyData(t *testing.T) { } func TestPartialSuccess(t *testing.T) { - mc := runMockCollectorWithConfig(t, &mockConfig{ - partial: &coltracepb.ExportTracePartialSuccess{ - RejectedSpans: 2, - ErrorMessage: "partially successful", - }, - }) + resp := &coltracepb.ExportTracePartialSuccess{ + RejectedSpans: 2, + ErrorMessage: "invalid data format", + } + mc := runMockCollectorWithConfig(t, &mockConfig{partial: resp}) t.Cleanup(func() { require.NoError(t, mc.stop()) }) - errors := []error{} - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { - errors = append(errors, err) - })) ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint) t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - - require.Equal(t, 1, len(errors)) - require.Contains(t, errors[0].Error(), "partially successful") - require.Contains(t, errors[0].Error(), "2 spans rejected") + var got *trace.PartialExportError + require.ErrorAs(t, exp.ExportSpans(ctx, roSpans), &got) + assert.Equal(t, resp.RejectedSpans, got.RejectedN) + assert.EqualError(t, got.Err, resp.ErrorMessage) } diff --git a/sdk/trace/flow_span_processor.go b/sdk/trace/flow_span_processor.go new file mode 100644 index 00000000000..1fbf1a2e78a --- /dev/null +++ b/sdk/trace/flow_span_processor.go @@ -0,0 +1,171 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace // import "go.opentelemetry.io/otel/sdk/trace" + +// Copied from https://github.com/MrAlias/flow for demo purposes. + +import ( + "context" + "errors" + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" +) + +const ( + startedState = "started" + endedState = "ended" + + // DefaultListenPort is the port the HTTP server listens on if not + // configured with the WithListenAddress option. + DefaultListenPort = 41820 + // DefaultListenAddress is the listen address of the HTTP server if not + // configured with the WithListenAddress option. + DefaultListenAddress = ":41820" +) + +type spanProcessor struct { + wrapped SpanExporter + + idleConnsClosed chan struct{} + server *http.Server + spanCounter *prometheus.CounterVec + exportErrCounter *prometheus.CounterVec +} + +// Wrap returns a wrapped version of the downstream SpanExporter with +// telemetry flow reporting. All calls to the returned SpanProcessor will +// introspected for telemetry data and then forwarded to downstream. +func Wrap(downstream SpanExporter, options ...Option) SpanProcessor { + mux := http.NewServeMux() + registry := prometheus.NewRegistry() + mux.Handle("/metrics", promhttp.InstrumentMetricHandler( + registry, + promhttp.HandlerFor(registry, promhttp.HandlerOpts{}), + )) + + c := newConfig(options) + sp := &spanProcessor{ + wrapped: downstream, + idleConnsClosed: make(chan struct{}), + server: &http.Server{Addr: c.address, Handler: mux}, + spanCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "spans_total", + Help: "The total number of processed spans", + }, []string{"state"}), + exportErrCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "failed_export_spans_total", + Help: "The total number of spans that failed to export", + }, []string{}), + } + registry.MustRegister(sp.spanCounter) + registry.MustRegister(sp.exportErrCounter) + + go func() { + switch err := sp.server.ListenAndServe(); err { + case nil, http.ErrServerClosed: + default: + otel.Handle(err) + } + close(sp.idleConnsClosed) + }() + + return sp +} + +// OnStart is called when a span is started. +func (sp *spanProcessor) OnStart(parent context.Context, s ReadWriteSpan) { + sp.spanCounter.WithLabelValues(startedState).Inc() +} + +// OnEnd is called when span is finished. +func (sp *spanProcessor) OnEnd(s ReadOnlySpan) { + sp.spanCounter.WithLabelValues(endedState).Inc() + spans := []ReadOnlySpan{s} + err := sp.wrapped.ExportSpans(context.TODO(), spans) + var errPart *PartialExportError + if errors.As(err, &errPart) { + sp.exportErrCounter.WithLabelValues().Add(float64(errPart.RejectedN)) + } else { + global.Error(err, "failed export", "span-count", len(spans)) + } +} + +// Shutdown is called when the SDK shuts down. The telemetry reporting process +// will be halted when this is called. +func (sp *spanProcessor) Shutdown(ctx context.Context) error { + errCh := make(chan error, 1) + go func() { + errCh <- sp.wrapped.Shutdown(ctx) + }() + + err := sp.server.Shutdown(ctx) + select { + case <-ctx.Done(): + // Abandon idle conns if context has expired. + if err == nil { + return ctx.Err() + } + return err + case <-sp.idleConnsClosed: + } + + // Downstream honors ctx timeout, no need to include in select above. + if e := <-errCh; e != nil { + // Prioritize downstream error over server shutdown error. + err = e + } + return err +} + +// ForceFlush dones nothing. +func (sp *spanProcessor) ForceFlush(ctx context.Context) error { return nil } + +type config struct { + // address is the listen address for the HTTP server. + address string +} + +func newConfig(options []Option) config { + c := config{ + address: DefaultListenAddress, + } + + for _, opt := range options { + c = opt.apply(c) + } + + return c +} + +// Option configures the flow SpanProcessor. +type Option interface { + apply(config) config +} + +type addressOpt string + +func (o addressOpt) apply(c config) config { + c.address = string(o) + return c +} + +// WithListenAddress sets the listen address of the HTTP server. +func WithListenAddress(addr string) Option { + return addressOpt(addr) +} diff --git a/sdk/trace/span_exporter.go b/sdk/trace/span_exporter.go index 9fb3d6eac3b..a72b4b7de00 100644 --- a/sdk/trace/span_exporter.go +++ b/sdk/trace/span_exporter.go @@ -14,7 +14,10 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" -import "context" +import ( + "context" + "fmt" +) // SpanExporter handles the delivery of spans to external receivers. This is // the final component in the trace export pipeline. @@ -45,3 +48,14 @@ type SpanExporter interface { // DO NOT CHANGE: any modification will not be backwards compatible and // must never be done outside of a new major release. } + +type PartialExportError struct { + RejectedN int64 + Err error +} + +func (e *PartialExportError) Error() string { + return fmt.Sprintf("%d spans not exported: %s", e.RejectedN, e.Err) +} + +func (e *PartialExportError) Unwrap() error { return e.Err }