From 965f750ed0e9cccba9b7e682782a9b59cc5c0b6b Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 16 Feb 2021 20:12:33 -0500 Subject: [PATCH] Add "compression" option to otlphttp exporter All exporters that use HTTPClientSettings will now have a choice to have a config option "compression". The config option is an opt-in and is only active for exporters that set HTTPClientSettings.EnableCompression=true. Right now this is only otlphttpexporter. The default value of "compression" option is empty and results in no compression, which is the same as the old behavior. You can also specify "compression: gzip" which will result in gzip encoding of outgoing http requests. --- config/configgrpc/README.md | 2 +- config/confighttp/README.md | 1 + config/confighttp/confighttp.go | 24 ++++++++++ exporter/otlphttpexporter/README.md | 2 + exporter/otlphttpexporter/config_test.go | 1 + exporter/otlphttpexporter/factory.go | 7 +-- exporter/otlphttpexporter/otlp_test.go | 55 +++++++++++++++++++++ internal/middleware/compression.go | 59 +++++++++++++++++++++++ internal/middleware/compression_test.go | 61 ++++++++++++++++++++++++ 9 files changed, 208 insertions(+), 4 deletions(-) diff --git a/config/configgrpc/README.md b/config/configgrpc/README.md index ef5fe4c0f4b..d66408d75d2 100644 --- a/config/configgrpc/README.md +++ b/config/configgrpc/README.md @@ -16,7 +16,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - [`balancer_name`](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md) -- `compression` (default = gzip): Compression type to use (only gzip is supported today) +- `compression` (default = none): Compression type to use (only gzip is supported today) - `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md) - `headers`: name/value pairs added to the request - [`keepalive`](https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 8e1c8b5cf75..8f6e05f4986 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -15,6 +15,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - `endpoint`: address:port +- `compression` (default = none): Compression type to use (only gzip is supported today) - `headers`: name/value pairs added to the HTTP request headers - [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport) - [`timeout`](https://golang.org/pkg/net/http/#Client) diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 56936c74da8..0f16cc38ff8 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -16,12 +16,15 @@ package confighttp import ( "crypto/tls" + "fmt" "net" "net/http" + "strings" "time" "github.com/rs/cors" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/internal/middleware" ) @@ -46,6 +49,15 @@ type HTTPClientSettings struct { // Existing header values are overwritten if collision happens. Headers map[string]string `mapstructure:"headers,omitempty"` + // The compression key for supported compression types within + // collector. Currently the only supported mode is `gzip`. + Compression string `mapstructure:"compression"` + + // EnableCompression must be set to true for Compression field to have effect. + // Set it to true in your CreateDefaultConfig if you want your exporter to + // have the "compression" setting supported. + EnableCompression bool `mapstructure:"-"` + // Custom Round Tripper to allow for individual components to intercept HTTP requests CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) } @@ -74,6 +86,18 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) { } } + if hcs.Compression != "" { + if !hcs.EnableCompression { + return nil, fmt.Errorf("unsupported option \"compression\"") + } + + if strings.ToLower(hcs.Compression) == configgrpc.CompressionGzip { + clientTransport = middleware.NewCompressRoundTripper(clientTransport) + } else { + return nil, fmt.Errorf("unsupported compression type %q", hcs.Compression) + } + } + if hcs.CustomRoundTripper != nil { clientTransport, err = hcs.CustomRoundTripper(clientTransport) if err != nil { diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 8c42bca107b..7dd10b47f07 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -33,6 +33,8 @@ The following settings can be optionally configured: - `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. +- `compression` (default = none): Compression type to use (only gzip is supported today) + - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index a0d3aa83a3b..c9b8fea19d1 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -63,6 +63,7 @@ func TestLoadConfig(t *testing.T) { QueueSize: 10, }, HTTPClientSettings: confighttp.HTTPClientSettings{ + EnableCompression: true, Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", "header1": "234", diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 592f0e016ab..4eb1eee141d 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -50,9 +50,10 @@ func createDefaultConfig() configmodels.Exporter { RetrySettings: exporterhelper.DefaultRetrySettings(), QueueSettings: exporterhelper.DefaultQueueSettings(), HTTPClientSettings: confighttp.HTTPClientSettings{ - Endpoint: "", - Timeout: 30 * time.Second, - Headers: map[string]string{}, + EnableCompression: true, + Endpoint: "", + Timeout: 30 * time.Second, + Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. WriteBufferSize: 512 * 1024, }, diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index f9bc3ce98df..c3a2a37a11e 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) { } } +func TestCompressionOptions(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + + tests := []struct { + name string + baseURL string + compression string + err bool + }{ + { + name: "no compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "", + }, + { + name: "gzip", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip", + }, + { + name: "incorrect compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip2", + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + startTraceReceiver(t, addr, sink) + + factory := NewFactory() + cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig()) + cfg.Compression = test.compression + exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + if test.err { + require.Error(t, err) + return + } + require.NoError(t, err) + startAndCleanup(t, exp) + + td := testdata.GenerateTraceDataOneSpan() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + require.Eventually(t, func() bool { + return sink.SpansCount() > 0 + }, 1*time.Second, 10*time.Millisecond) + allTraces := sink.AllTraces() + require.Len(t, allTraces, 1) + assert.EqualValues(t, td, allTraces[0]) + }) + } +} + func TestMetricsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 13504e80953..7e076d637fa 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -15,12 +15,71 @@ package middleware import ( + "bytes" "compress/gzip" "compress/zlib" "io" "net/http" ) +const ( + headerContentEncoding = "Content-Encoding" + headerValueGZIP = "gzip" +) + +type CompressRoundTripper struct { + http.RoundTripper +} + +func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper { + return &CompressRoundTripper{ + RoundTripper: rt, + } +} + +func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + + if req.Header.Get(headerContentEncoding) != "" { + // If the header already specifies a content encoding then skip compression + // since we don't want to compress it again. This is a safeguard that normally + // should not happen since CompressRoundTripper is not intended to be used + // with http clients which already do their own compression. + return r.RoundTripper.RoundTrip(req) + } + + gzipWriter := gzip.NewWriter(nil) + + // Gzip the body. + buf := bytes.NewBuffer([]byte{}) + gzipWriter.Reset(buf) + _, copyErr := io.Copy(gzipWriter, req.Body) + closeErr := req.Body.Close() + + if err := gzipWriter.Close(); err != nil { + return nil, err + } + + if copyErr != nil { + return nil, copyErr + } + if closeErr != nil { + return nil, closeErr + } + + // Create a new request since the docs say that we cannot modify the "req" + // (see https://golang.org/pkg/net/http/#RoundTripper). + cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf) + if err != nil { + return nil, err + } + + // Clone the headers and add gzip encoding header. + cReq.Header = req.Header.Clone() + cReq.Header.Add(headerContentEncoding, headerValueGZIP) + + return r.RoundTripper.RoundTrip(cReq) +} + type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) type decompressor struct { diff --git a/internal/middleware/compression_test.go b/internal/middleware/compression_test.go index e8770ff4c3e..a303851679b 100644 --- a/internal/middleware/compression_test.go +++ b/internal/middleware/compression_test.go @@ -31,6 +31,67 @@ import ( "go.opentelemetry.io/collector/testutil" ) +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedBody, _ := compressGzip(testBody) + + tests := []struct { + name string + encoding string + reqBody []byte + }{ + { + name: "NoCompression", + encoding: "", + reqBody: testBody, + }, + { + name: "ValidGzip", + encoding: "gzip", + reqBody: compressedBody.Bytes(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(200) + }) + + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to create listener: %v", err) + srv := &http.Server{ + Handler: handler, + } + go func() { + _ = srv.Serve(ln) + }() + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", ln.Addr().String()) + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest("GET", serverURL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + + client := http.Client{} + if tt.encoding == "gzip" { + client.Transport = NewCompressRoundTripper(http.DefaultTransport) + } + res, err := client.Do(req) + require.NoError(t, err) + + ioutil.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + require.NoError(t, srv.Close()) + }) + } +} + func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") tests := []struct {