diff --git a/CHANGELOG.md b/CHANGELOG.md index b11a1e30860..465dcd7a48c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Remove the deprecated `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` module. (#4707) - Remove the deprecated `go.opentelemetry.io/otel/example/view` module. (#4708) +### Fixed + +- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4719) +- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4719) + ## [1.20.0/0.43.0] 2023-11-10 This release brings a breaking change for custom trace API implementations. Some interfaces (`TracerProvider`, `Tracer`, `Span`) now embed the `go.opentelemetry.io/otel/trace/embedded` types. Implementors need to update their implementations based on what they want the default behavior to be. See the "API Implementations" section of the [trace API] package documentation for more information about how to accomplish this. diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go index c96ca1fda6e..f08fbd5c5f7 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go @@ -195,6 +195,8 @@ func (e *HTTPResponseError) Unwrap() error { return e.Err } // HTTPCollector is an OTLP HTTP server that collects all requests it receives. type HTTPCollector struct { + plainTextResponse bool + headersMu sync.Mutex headers http.Header storage *Storage @@ -217,7 +219,7 @@ type HTTPCollector struct { // If errCh is not nil, the collector will respond to HTTP requests with errors // sent on that channel. This means that if errCh is not nil Export calls will // block until an error is received. -func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) { +func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) { u, err := url.Parse(endpoint) if err != nil { return nil, err @@ -234,6 +236,9 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle storage: NewStorage(), resultCh: resultCh, } + for _, opt := range opts { + opt(c) + } c.listener, err = net.Listen("tcp", u.Host) if err != nil { @@ -262,6 +267,14 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle return c, nil } +// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return +// a plaintext, instead of protobuf, response. +func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) { + return func(s *HTTPCollector) { + s.plainTextResponse = true + } +} + // Shutdown shuts down the HTTP server closing all open connections and // listeners. func (c *HTTPCollector) Shutdown(ctx context.Context) error { @@ -382,6 +395,13 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) { return } + if c.plainTextResponse { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + return + } + w.Header().Set("Content-Type", "application/x-protobuf") w.WriteHeader(http.StatusOK) if resp.Response == nil { diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 7cc6f6ae7bb..73463c91d5f 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -166,8 +166,11 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou if _, err := io.Copy(&respData, resp.Body); err != nil { return err } + if respData.Len() == 0 { + return nil + } - if respData.Len() != 0 { + if resp.Header.Get("Content-Type") == "application/x-protobuf" { var respProto colmetricpb.ExportMetricsServiceResponse if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { return err diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 2f48f472748..a4ead01c1f1 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) type clientShim struct { @@ -65,6 +66,23 @@ func TestClient(t *testing.T) { t.Run("Integration", otest.RunClientTests(factory)) } +func TestClientWithHTTPCollectorRespondingPlainText(t *testing.T) { + ctx := context.Background() + coll, err := otest.NewHTTPCollector("", nil, otest.WithHTTPCollectorRespondingPlainText()) + require.NoError(t, err) + + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) + client, err := newClient(cfg) + require.NoError(t, err) + + require.NoError(t, client.UploadMetrics(ctx, &mpb.ResourceMetrics{})) + require.NoError(t, client.Shutdown(ctx)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceMetrics") +} + func TestNewWithInvalidEndpoint(t *testing.T) { ctx := context.Background() exp, err := New(ctx, WithEndpoint("host:invalid-port")) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest/collector.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest/collector.go index 503eba65bea..6398f8ba5ba 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest/collector.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest/collector.go @@ -195,6 +195,8 @@ func (e *HTTPResponseError) Unwrap() error { return e.Err } // HTTPCollector is an OTLP HTTP server that collects all requests it receives. type HTTPCollector struct { + plainTextResponse bool + headersMu sync.Mutex headers http.Header storage *Storage @@ -217,7 +219,7 @@ type HTTPCollector struct { // If errCh is not nil, the collector will respond to HTTP requests with errors // sent on that channel. This means that if errCh is not nil Export calls will // block until an error is received. -func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) { +func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) { u, err := url.Parse(endpoint) if err != nil { return nil, err @@ -234,6 +236,9 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle storage: NewStorage(), resultCh: resultCh, } + for _, opt := range opts { + opt(c) + } c.listener, err = net.Listen("tcp", u.Host) if err != nil { @@ -262,6 +267,14 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle return c, nil } +// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return +// a plaintext, instead of protobuf, response. +func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) { + return func(s *HTTPCollector) { + s.plainTextResponse = true + } +} + // Shutdown shuts down the HTTP server closing all open connections and // listeners. func (c *HTTPCollector) Shutdown(ctx context.Context) error { @@ -382,6 +395,13 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) { return } + if c.plainTextResponse { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + return + } + w.Header().Set("Content-Type", "application/x-protobuf") w.WriteHeader(http.StatusOK) if resp.Response == nil { diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index 068aef300ee..3b5f3839f27 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -177,8 +177,11 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc if _, err := io.Copy(&respData, resp.Body); err != nil { return err } + if respData.Len() == 0 { + return nil + } - if respData.Len() != 0 { + if resp.Header.Get("Content-Type") == "application/x-protobuf" { var respProto coltracepb.ExportTraceServiceResponse if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { return err diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index 21838695c5e..63a4cd4f207 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -430,3 +430,24 @@ func TestOtherHTTPSuccess(t *testing.T) { }) } } + +func TestCollectorRespondingNonProtobufContent(t *testing.T) { + mcCfg := mockCollectorConfig{ + InjectContentType: "application/octet-stream", + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(mc.Endpoint()), + otlptracehttp.WithInsecure(), + ) + ctx := context.Background() + exporter, err := otlptrace.New(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(context.Background())) + }() + err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) + assert.NoError(t, err) + assert.Len(t, mc.GetSpans(), 1) +} diff --git a/internal/shared/otlp/otlpmetric/otest/collector.go.tmpl b/internal/shared/otlp/otlpmetric/otest/collector.go.tmpl index 1adf55807a5..fba237e68fc 100644 --- a/internal/shared/otlp/otlpmetric/otest/collector.go.tmpl +++ b/internal/shared/otlp/otlpmetric/otest/collector.go.tmpl @@ -195,6 +195,8 @@ func (e *HTTPResponseError) Unwrap() error { return e.Err } // HTTPCollector is an OTLP HTTP server that collects all requests it receives. type HTTPCollector struct { + plainTextResponse bool + headersMu sync.Mutex headers http.Header storage *Storage @@ -217,7 +219,7 @@ type HTTPCollector struct { // If errCh is not nil, the collector will respond to HTTP requests with errors // sent on that channel. This means that if errCh is not nil Export calls will // block until an error is received. -func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) { +func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) { u, err := url.Parse(endpoint) if err != nil { return nil, err @@ -234,6 +236,9 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle storage: NewStorage(), resultCh: resultCh, } + for _, opt := range opts { + opt(c) + } c.listener, err = net.Listen("tcp", u.Host) if err != nil { @@ -262,6 +267,14 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle return c, nil } +// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return +// a plaintext, instead of protobuf, response. +func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) { + return func(s *HTTPCollector) { + s.plainTextResponse = true + } +} + // Shutdown shuts down the HTTP server closing all open connections and // listeners. func (c *HTTPCollector) Shutdown(ctx context.Context) error { @@ -382,6 +395,13 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) { return } + if c.plainTextResponse { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + return + } + w.Header().Set("Content-Type", "application/x-protobuf") w.WriteHeader(http.StatusOK) if resp.Response == nil {