diff --git a/config/configgrpc/README.md b/config/configgrpc/README.md index ef5fe4c0f4b..d66408d75d2 100644 --- a/config/configgrpc/README.md +++ b/config/configgrpc/README.md @@ -16,7 +16,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - [`balancer_name`](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md) -- `compression` (default = gzip): Compression type to use (only gzip is supported today) +- `compression` (default = none): Compression type to use (only gzip is supported today) - `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md) - `headers`: name/value pairs added to the request - [`keepalive`](https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 8e1c8b5cf75..8f6e05f4986 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -15,6 +15,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - `endpoint`: address:port +- `compression` (default = none): Compression type to use (only gzip is supported today) - `headers`: name/value pairs added to the HTTP request headers - [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport) - [`timeout`](https://golang.org/pkg/net/http/#Client) diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 56936c74da8..91ce91e67c1 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -16,12 +16,15 @@ package confighttp import ( "crypto/tls" + "fmt" "net" "net/http" + "strings" "time" "github.com/rs/cors" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/internal/middleware" ) @@ -46,6 +49,10 @@ type HTTPClientSettings struct { // Existing header values are overwritten if collision happens. Headers map[string]string `mapstructure:"headers,omitempty"` + // The compression key for supported compression types within + // collector. Currently the only supported mode is `gzip`. + Compression string `mapstructure:"compression"` + // Custom Round Tripper to allow for individual components to intercept HTTP requests CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) } @@ -74,6 +81,14 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) { } } + if hcs.Compression != "" { + if strings.ToLower(hcs.Compression) == configgrpc.CompressionGzip { + clientTransport = middleware.NewCompressRoundTripper(clientTransport) + } else { + return nil, fmt.Errorf("unsupported compression type %q", hcs.Compression) + } + } + if hcs.CustomRoundTripper != nil { clientTransport, err = hcs.CustomRoundTripper(clientTransport) if err != nil { diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 8c42bca107b..7dd10b47f07 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -33,6 +33,8 @@ The following settings can be optionally configured: - `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. +- `compression` (default = none): Compression type to use (only gzip is supported today) + - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. diff --git a/exporter/zipkinexporter/README.md b/exporter/zipkinexporter/README.md index 865cb20e017..f539e0009b9 100644 --- a/exporter/zipkinexporter/README.md +++ b/exporter/zipkinexporter/README.md @@ -26,6 +26,7 @@ As a result, the following parameters are also required: The following settings are optional: +- `compression` (default = none): Compression type to use (only gzip is supported today) - `defaultservicename` (default = ``): What to name services missing this information. diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 13504e80953..5fd4d57a7e8 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -15,12 +15,58 @@ package middleware import ( + "bytes" "compress/gzip" "compress/zlib" "io" "net/http" ) +const ( + headerContentEncoding = "Content-Encoding" + headerValueGZIP = "gzip" +) + +type CompressRoundTripper struct { + http.RoundTripper + gzipWriter *gzip.Writer +} + +func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper { + return &CompressRoundTripper{ + RoundTripper: rt, + gzipWriter: gzip.NewWriter(nil), + } +} + +func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + buf := bytes.NewBuffer([]byte{}) + r.gzipWriter.Reset(buf) + + _, err := io.Copy(r.gzipWriter, req.Body) + defer req.Body.Close() + + if err != nil { + return nil, err + } + + if err := r.gzipWriter.Close(); err != nil { + return nil, err + } + + cReq, err := http.NewRequest(req.Method, req.URL.String(), buf) + if err != nil { + return nil, err + } + + cReq.Header = req.Header.Clone() + cReq.Header.Add(headerContentEncoding, headerValueGZIP) + + resp, err := r.RoundTripper.RoundTrip(cReq) + + return resp, err +} + type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) type decompressor struct { diff --git a/internal/middleware/compression_test.go b/internal/middleware/compression_test.go index e8770ff4c3e..7f5524faae2 100644 --- a/internal/middleware/compression_test.go +++ b/internal/middleware/compression_test.go @@ -31,6 +31,68 @@ import ( "go.opentelemetry.io/collector/testutil" ) +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedBody, _ := compressGzip(testBody) + + tests := []struct { + name string + encoding string + reqBody []byte + }{ + { + name: "NoCompression", + encoding: "", + reqBody: testBody, + }, + { + name: "ValidGzip", + encoding: "gzip", + reqBody: compressedBody.Bytes(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(200) + }) + + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to create listener: %v", err) + srv := &http.Server{ + Handler: handler, + } + go func() { + _ = srv.Serve(ln) + }() + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", ln.Addr().String()) + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest("GET", serverURL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + req.Header.Set("Content-Encoding", tt.encoding) + + client := http.Client{} + if tt.encoding == "gzip" { + client.Transport = NewCompressRoundTripper(http.DefaultTransport) + } + res, err := client.Do(req) + require.NoError(t, err) + + ioutil.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + require.NoError(t, srv.Close()) + }) + } +} + func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") tests := []struct {