diff --git a/.chloggen/compression-config-option2.yaml b/.chloggen/compression-config-option2.yaml new file mode 100644 index 00000000000..0d1f81e0c19 --- /dev/null +++ b/.chloggen/compression-config-option2.yaml @@ -0,0 +1,26 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added support for configuring compression levels. + +# One or more tracking issues or pull requests related to the change +issues: [10467] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: A new configuration option called CompressionParams has been added to confighttp. | + This allows users to configure the compression levels for the confighttp client. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index f5b47f9caa2..4642580f4e2 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -8,6 +8,12 @@ import "fmt" // Type represents a compression method type Type string +type Level int + +type CompressionParams struct { + Level Level `mapstructure:"level"` +} + const ( TypeGzip Type = "gzip" TypeZlib Type = "zlib" @@ -15,14 +21,14 @@ const ( TypeSnappy Type = "snappy" TypeZstd Type = "zstd" TypeLz4 Type = "lz4" - typeNone Type = "none" - typeEmpty Type = "" + TypeNone Type = "none" + TypeEmpty Type = "" ) // IsCompressed returns false if CompressionType is nil, none, or empty. // Otherwise, returns true. func (ct *Type) IsCompressed() bool { - return *ct != typeEmpty && *ct != typeNone + return *ct != TypeEmpty && *ct != TypeNone } func (ct *Type) UnmarshalText(in []byte) error { @@ -33,8 +39,8 @@ func (ct *Type) UnmarshalText(in []byte) error { typ == TypeSnappy || typ == TypeZstd || typ == TypeLz4 || - typ == typeNone || - typ == typeEmpty { + typ == TypeNone || + typ == TypeEmpty { *ct = typ return nil } diff --git a/config/configcompression/compressiontype_test.go b/config/configcompression/compressiontype_test.go index 5fb78c054bc..dd7dea64d4e 100644 --- a/config/configcompression/compressiontype_test.go +++ b/config/configcompression/compressiontype_test.go @@ -71,7 +71,7 @@ func TestUnmarshalText(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - temp := typeNone + temp := TypeNone err := temp.UnmarshalText(tt.compressionName) if tt.shouldError { assert.Error(t, err) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 160041d8a5c..cf8df1768de 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -26,6 +26,31 @@ README](../configtls/README.md). - `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, `deflate`, and `lz4`. - look at the documentation for the server-side of the communication. - `none` will be treated as uncompressed, and any other inputs will cause an error. +- `compression_params` : Configure advanced compression options + - `level`: Configure compression level for `compression` type + - The following are valid combinations of `compression` and `level` + - `gzip` + - NoCompression: `0` + - BestSpeed: `1` + - BestCompression: `9` + - DefaultCompression: `-1` + - `zlib` + - NoCompression: `0` + - BestSpeed: `1` + - BestCompression: `9` + - DefaultCompression: `-1` + - `deflate` + - NoCompression: `0` + - BestSpeed: `1` + - BestCompression: `9` + - DefaultCompression: `-1` + - `zstd` + - SpeedFastest: `1` + - SpeedDefault: `3` + - SpeedBetterCompression: `6` + - SpeedBestCompression: `11` + - `snappy` + No compression levels supported yet - [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport) - [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport) - [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport) @@ -52,7 +77,9 @@ exporter: headers: test1: "value1" "test 2": "value 2" - compression: zstd + compression: gzip + compression_params: + level: 1 cookies: enabled: true ``` diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 144b30affe9..62638e0c7be 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -21,9 +21,10 @@ import ( ) type compressRoundTripper struct { - rt http.RoundTripper - compressionType configcompression.Type - compressor *compressor + rt http.RoundTripper + compressionType configcompression.Type + compressionParams configcompression.CompressionParams + compressor *compressor } var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ @@ -76,15 +77,22 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro }, } -func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) { - encoder, err := newCompressor(compressionType) +func newCompressionParams(level configcompression.Level) configcompression.CompressionParams { + return configcompression.CompressionParams{ + Level: level, + } +} + +func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressRoundTripper, error) { + encoder, err := newCompressor(compressionType, compressionParams) if err != nil { return nil, err } return &compressRoundTripper{ - rt: rt, - compressionType: compressionType, - compressor: encoder, + rt: rt, + compressionType: compressionType, + compressionParams: compressionParams, + compressor: encoder, }, nil } diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index e173f087af2..b19adef0e0e 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -9,6 +9,7 @@ import ( "compress/zlib" "context" "errors" + "fmt" "io" "net/http" "net/http/httptest" @@ -35,9 +36,12 @@ func TestHTTPClientCompression(t *testing.T) { compressedZstdBody := compressZstd(t, testBody) compressedLz4Body := compressLz4(t, testBody) + const invalidGzipLevel configcompression.Level = 100 + tests := []struct { name string encoding configcompression.Type + level configcompression.Level reqBody []byte shouldError bool }{ @@ -56,18 +60,35 @@ func TestHTTPClientCompression(t *testing.T) { { name: "ValidGzip", encoding: configcompression.TypeGzip, + level: gzip.BestSpeed, reqBody: compressedGzipBody.Bytes(), shouldError: false, }, + { + name: "InvalidGzip", + encoding: configcompression.TypeGzip, + level: invalidGzipLevel, + reqBody: compressedGzipBody.Bytes(), + shouldError: true, + }, + { + name: "InvalidCompression", + encoding: configcompression.Type("invalid"), + level: invalidGzipLevel, + reqBody: compressedGzipBody.Bytes(), + shouldError: true, + }, { name: "ValidZlib", encoding: configcompression.TypeZlib, + level: gzip.BestSpeed, reqBody: compressedZlibBody.Bytes(), shouldError: false, }, { name: "ValidDeflate", encoding: configcompression.TypeDeflate, + level: gzip.BestSpeed, reqBody: compressedDeflateBody.Bytes(), shouldError: false, }, @@ -80,6 +101,7 @@ func TestHTTPClientCompression(t *testing.T) { { name: "ValidZstd", encoding: configcompression.TypeZstd, + level: gzip.BestSpeed, reqBody: compressedZstdBody.Bytes(), shouldError: false, }, @@ -104,10 +126,17 @@ func TestHTTPClientCompression(t *testing.T) { req, err := http.NewRequest(http.MethodGet, srv.URL, reqBody) require.NoError(t, err, "failed to create request to test handler") - clientSettings := ClientConfig{ - Endpoint: srv.URL, - Compression: tt.encoding, + Endpoint: srv.URL, + Compression: tt.encoding, + CompressionParams: newCompressionParams(tt.level), + } + err = clientSettings.Validate() + if tt.shouldError { + require.Error(t, err) + message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.level) + assert.Equal(t, message, err.Error()) + return } client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) require.NoError(t, err) @@ -303,7 +332,8 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) { require.NoError(t, err, "failed to create request to test handler") client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compressionParams := newCompressionParams(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams) require.NoError(t, err) res, err := client.Do(req) require.NoError(t, err) @@ -323,7 +353,8 @@ func TestHTTPContentCompressionCopyError(t *testing.T) { require.NoError(t, err) client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compressionParams := newCompressionParams(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -347,7 +378,8 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) { require.NoError(t, err) client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compressionParams := newCompressionParams(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -448,7 +480,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { func compressGzip(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - gw := gzip.NewWriter(&buf) + gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) _, err := gw.Write(body) require.NoError(t, err) require.NoError(t, gw.Close()) @@ -457,7 +489,7 @@ func compressGzip(t testing.TB, body []byte) *bytes.Buffer { func compressZlib(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - zw := zlib.NewWriter(&buf) + zw, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed) _, err := zw.Write(body) require.NoError(t, err) require.NoError(t, zw.Close()) @@ -475,7 +507,9 @@ func compressSnappy(t testing.TB, body []byte) *bytes.Buffer { func compressZstd(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - zw, _ := zstd.NewWriter(&buf) + compression := zstd.SpeedFastest + encoderLevel := zstd.WithEncoderLevel(compression) + zw, _ := zstd.NewWriter(&buf, encoderLevel) _, err := zw.Write(body) require.NoError(t, err) require.NoError(t, zw.Close()) diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 5c8fefa92cc..917f1db4fa5 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -8,6 +8,7 @@ import ( "compress/gzip" "compress/zlib" "errors" + "fmt" "io" "sync" @@ -23,45 +24,72 @@ type writeCloserReset interface { Reset(w io.Writer) } -var ( - _ writeCloserReset = (*gzip.Writer)(nil) - gZipPool = &compressor{pool: sync.Pool{New: func() any { return gzip.NewWriter(nil) }}} - _ writeCloserReset = (*snappy.Writer)(nil) - snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}} - _ writeCloserReset = (*zstd.Encoder)(nil) - // Concurrency 1 disables async decoding via goroutines. This is useful to reduce memory usage and isn't a bottleneck for compression using sync.Pool. - zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}} - _ writeCloserReset = (*zlib.Writer)(nil) - zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}} - _ writeCloserReset = (*lz4.Writer)(nil) - lz4Pool = &compressor{pool: sync.Pool{New: func() any { - lz := lz4.NewWriter(nil) - // Setting concurrency to 1 to disable async decoding by goroutines. This will reduce the overall memory footprint and pool - _ = lz.Apply(lz4.ConcurrencyOption(1)) - return lz - }}} -) +type compressorMap struct { + pools map[string]*compressor +} type compressor struct { pool sync.Pool } +var ( + compressorPools = &compressorMap{pools: make(map[string]*compressor)} + snappyCompressor = &compressor{} + lz4Compressor = &compressor{} + _ writeCloserReset = (*gzip.Writer)(nil) + _ writeCloserReset = (*snappy.Writer)(nil) + _ writeCloserReset = (*zstd.Encoder)(nil) + _ writeCloserReset = (*zlib.Writer)(nil) + _ writeCloserReset = (*lz4.Writer)(nil) +) + // writerFactory defines writer field in CompressRoundTripper. // The validity of input is already checked when NewCompressRoundTripper was called in confighttp, -func newCompressor(compressionType configcompression.Type) (*compressor, error) { +func newCompressor(compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressor, error) { + mapKey := fmt.Sprintf("%s/%d", compressionType, compressionParams.Level) switch compressionType { case configcompression.TypeGzip: - return gZipPool, nil + gZipCompressor, gzipExists := compressorPools.pools[mapKey] + if gzipExists { + return gZipCompressor, nil + } + gZipCompressor = &compressor{} + gZipCompressor.pool = sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)); return w }} + compressorPools.pools[mapKey] = gZipCompressor + return gZipCompressor, nil case configcompression.TypeSnappy: - return snappyPool, nil + if snappyCompressor.pool.Get() == nil { + snappyCompressor.pool = sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }} + return snappyCompressor, nil + } + return snappyCompressor, nil case configcompression.TypeZstd: - return zStdPool, nil + zstdCompressor, zstdExists := compressorPools.pools[mapKey] + compression := zstd.EncoderLevelFromZstd(int(compressionParams.Level)) + encoderLevel := zstd.WithEncoderLevel(compression) + if zstdExists { + return zstdCompressor, nil + } + zstdCompressor = &compressor{} + zstdCompressor.pool = sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }} + return zstdCompressor, nil case configcompression.TypeZlib, configcompression.TypeDeflate: - return zLibPool, nil + zlibCompressor, zlibExists := compressorPools.pools[mapKey] + if zlibExists { + return zlibCompressor, nil + } + zlibCompressor = &compressor{} + zlibCompressor.pool = sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, int(compressionParams.Level)); return w }} + compressorPools.pools[mapKey] = zlibCompressor + return zlibCompressor, nil case configcompression.TypeLz4: - return lz4Pool, nil + if lz4Compressor.pool.Get() == nil { + lz4Compressor.pool = sync.Pool{New: func() any { lz := lz4.NewWriter(nil); _ = lz.Apply(lz4.ConcurrencyOption(1)); return lz }} + return lz4Compressor, nil + } + return lz4Compressor, nil } - return nil, errors.New("unsupported compression type, ") + return nil, errors.New("unsupported compression type") } func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error { diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 69ac3900fe5..59fc8ec2133 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -4,6 +4,7 @@ package confighttp // import "go.opentelemetry.io/collector/config/confighttp" import ( + "compress/zlib" "context" "crypto/tls" "errors" @@ -71,6 +72,9 @@ type ClientConfig struct { // The compression key for supported compression types within collector. Compression configcompression.Type `mapstructure:"compression"` + // Advanced configuration options for the Compression + CompressionParams configcompression.CompressionParams `mapstructure:"compression_params"` + // MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open. // By default, it is set to 100. MaxIdleConns *int `mapstructure:"max_idle_conns"` @@ -134,6 +138,31 @@ func NewDefaultClientConfig() ClientConfig { } } +// Checks the validity of zlib/gzip/flate compression levels +func isValidLevel(level configcompression.Level) bool { + return level == zlib.DefaultCompression || + level == zlib.HuffmanOnly || + level == zlib.NoCompression || + (level >= zlib.BestSpeed && level <= zlib.BestCompression) +} + +func (hcs *ClientConfig) Validate() error { + if hcs.Compression.IsCompressed() { + if (hcs.Compression == configcompression.TypeGzip && isValidLevel(hcs.CompressionParams.Level)) || + (hcs.Compression == configcompression.TypeZlib && isValidLevel(hcs.CompressionParams.Level)) || + (hcs.Compression == configcompression.TypeDeflate && isValidLevel(hcs.CompressionParams.Level)) || + hcs.Compression == configcompression.TypeSnappy || + hcs.Compression == configcompression.TypeLz4 || + hcs.Compression == configcompression.TypeZstd || + hcs.Compression == configcompression.TypeNone || + hcs.Compression == configcompression.TypeEmpty { + return nil + } + return fmt.Errorf("unsupported compression type and level %s - %d", hcs.Compression, hcs.CompressionParams.Level) + } + return nil +} + // ToClient creates an HTTP client. func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, settings component.TelemetrySettings) (*http.Client, error) { tlsCfg, err := hcs.TLSSetting.LoadTLSConfig(ctx) @@ -219,7 +248,7 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett // Compress the body using specified compression methods if non-empty string is provided. // Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed. if hcs.Compression.IsCompressed() { - clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression) + clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression, hcs.CompressionParams) if err != nil { return nil, err } diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 1a759cf5f28..888d1970382 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -42,7 +42,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index f25ea25d5a9..5359504ba4d 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -27,8 +27,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU=