From 0f1565ea574c174aa97f01b03ef20d09e85423a6 Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Wed, 4 Dec 2024 15:51:56 -0500 Subject: [PATCH 1/3] feat: Add support for configuring compression levels --- .chloggen/compression-config-option2.yaml | 25 +++++++ config/configcompression/compressiontype.go | 58 ++++++++++++--- config/configcompression/go.mod | 1 + config/configcompression/go.sum | 2 + config/configgrpc/go.mod | 2 +- config/configgrpc/go.sum | 4 +- config/confighttp/compression.go | 24 ++++--- config/confighttp/compression_test.go | 48 ++++++++++--- config/confighttp/compressor.go | 78 ++++++++++++++------- config/confighttp/confighttp.go | 5 +- 10 files changed, 191 insertions(+), 56 deletions(-) create mode 100644 .chloggen/compression-config-option2.yaml diff --git a/.chloggen/compression-config-option2.yaml b/.chloggen/compression-config-option2.yaml new file mode 100644 index 00000000000..64380c8915d --- /dev/null +++ b/.chloggen/compression-config-option2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'breaking' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added support for configuring compression levels. + +# One or more tracking issues or pull requests related to the change +issues: [10467] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index f5b47f9caa2..7133f300424 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -3,22 +3,35 @@ package configcompression // import "go.opentelemetry.io/collector/config/configcompression" -import "fmt" +import ( + "fmt" + + "github.com/klauspost/compress/zlib" +) // Type represents a compression method type Type string +type Level int + +type CompressionConfig struct { + Level Level `mapstructure:"level"` +} + const ( - TypeGzip Type = "gzip" - TypeZlib Type = "zlib" - TypeDeflate Type = "deflate" - TypeSnappy Type = "snappy" - TypeZstd Type = "zstd" - TypeLz4 Type = "lz4" - typeNone Type = "none" - typeEmpty Type = "" + TypeGzip Type = "gzip" + TypeZlib Type = "zlib" + TypeDeflate Type = "deflate" + TypeSnappy Type = "snappy" + TypeZstd Type = "zstd" + TypeLz4 Type = "lz4" + typeNone Type = "none" + typeEmpty Type = "" + LevelNone Level = 0 ) +var typ Type + // IsCompressed returns false if CompressionType is nil, none, or empty. // Otherwise, returns true. func (ct *Type) IsCompressed() bool { @@ -26,7 +39,7 @@ func (ct *Type) IsCompressed() bool { } func (ct *Type) UnmarshalText(in []byte) error { - typ := Type(in) + typ = Type(in) if typ == TypeGzip || typ == TypeZlib || typ == TypeDeflate || @@ -40,3 +53,28 @@ func (ct *Type) UnmarshalText(in []byte) error { } return fmt.Errorf("unsupported compression type %q", typ) } + +func (cc *CompressionConfig) Validate() error { + if (typ == TypeGzip && isValidLevel(int(cc.Level))) || + (typ == TypeZlib && isValidLevel(int(cc.Level))) || + (typ == TypeDeflate && isValidLevel(int(cc.Level))) || + typ == TypeSnappy || + typ == TypeLz4 || + typ == TypeZstd || + typ == typeNone || + typ == typeEmpty { + return nil + } + + return fmt.Errorf("unsupported compression type and level %s - %d", typ, cc.Level) +} + +// Checks the validity of zlib/gzip/flate compression levels +func isValidLevel(level int) bool { + return level == zlib.DefaultCompression || + level == int(LevelNone) || + level == zlib.HuffmanOnly || + level == zlib.NoCompression || + level == zlib.BestSpeed || + (level >= zlib.BestSpeed && level <= zlib.BestCompression) +} diff --git a/config/configcompression/go.mod b/config/configcompression/go.mod index f2be0eee989..9461651915e 100644 --- a/config/configcompression/go.mod +++ b/config/configcompression/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configcompression go 1.22.0 require ( + github.com/klauspost/compress v1.17.11 github.com/stretchr/testify v1.10.0 go.uber.org/goleak v1.3.0 ) diff --git a/config/configcompression/go.sum b/config/configcompression/go.sum index 4cd47abd5b6..dd47f9dab52 100644 --- a/config/configcompression/go.sum +++ b/config/configcompression/go.sum @@ -1,6 +1,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index 1afb3dc4508..682b0a8e53f 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -36,7 +36,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/config/configgrpc/go.sum b/config/configgrpc/go.sum index 5fc35136f9c..afb0e574e2b 100644 --- a/config/configgrpc/go.sum +++ b/config/configgrpc/go.sum @@ -21,8 +21,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 144b30affe9..13bc505694f 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -21,9 +21,10 @@ import ( ) type compressRoundTripper struct { - rt http.RoundTripper - compressionType configcompression.Type - compressor *compressor + rt http.RoundTripper + compressionType configcompression.Type + compressionConfig configcompression.CompressionConfig + compressor *compressor } var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ @@ -76,15 +77,22 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro }, } -func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) { - encoder, err := newCompressor(compressionType) +func newCompressionConfig(level configcompression.Level) configcompression.CompressionConfig { + return configcompression.CompressionConfig{ + Level: level, + } +} + +func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type, compressionConfig configcompression.CompressionConfig) (*compressRoundTripper, error) { + encoder, err := newCompressor(compressionType, compressionConfig) if err != nil { return nil, err } return &compressRoundTripper{ - rt: rt, - compressionType: compressionType, - compressor: encoder, + rt: rt, + compressionType: compressionType, + compressionConfig: compressionConfig, + compressor: encoder, }, nil } diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index e173f087af2..05f8ffe411b 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -9,6 +9,7 @@ import ( "compress/zlib" "context" "errors" + "fmt" "io" "net/http" "net/http/httptest" @@ -35,9 +36,12 @@ func TestHTTPClientCompression(t *testing.T) { compressedZstdBody := compressZstd(t, testBody) compressedLz4Body := compressLz4(t, testBody) + const invalidGzipLevel configcompression.Level = 100 + tests := []struct { name string encoding configcompression.Type + enclevel configcompression.Level reqBody []byte shouldError bool }{ @@ -56,18 +60,28 @@ func TestHTTPClientCompression(t *testing.T) { { name: "ValidGzip", encoding: configcompression.TypeGzip, + enclevel: gzip.BestSpeed, reqBody: compressedGzipBody.Bytes(), shouldError: false, }, + { + name: "InvalidGzip", + encoding: configcompression.TypeGzip, + enclevel: invalidGzipLevel, + reqBody: compressedGzipBody.Bytes(), + shouldError: true, + }, { name: "ValidZlib", encoding: configcompression.TypeZlib, + enclevel: gzip.BestSpeed, reqBody: compressedZlibBody.Bytes(), shouldError: false, }, { name: "ValidDeflate", encoding: configcompression.TypeDeflate, + enclevel: gzip.BestSpeed, reqBody: compressedDeflateBody.Bytes(), shouldError: false, }, @@ -80,6 +94,7 @@ func TestHTTPClientCompression(t *testing.T) { { name: "ValidZstd", encoding: configcompression.TypeZstd, + enclevel: gzip.BestSpeed, reqBody: compressedZstdBody.Bytes(), shouldError: false, }, @@ -104,10 +119,20 @@ func TestHTTPClientCompression(t *testing.T) { req, err := http.NewRequest(http.MethodGet, srv.URL, reqBody) require.NoError(t, err, "failed to create request to test handler") - + compressionType := configcompression.Type(tt.encoding) + compression_config := newCompressionConfig(tt.enclevel) + err = compression_config.Validate() clientSettings := ClientConfig{ - Endpoint: srv.URL, - Compression: tt.encoding, + Endpoint: srv.URL, + Compression: tt.encoding, + CompressionConfig: newCompressionConfig(tt.enclevel), + } + compressionType.UnmarshalText([]byte(tt.encoding)) + if tt.shouldError { + assert.Error(t, err) + message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.enclevel) + assert.Equal(t, message, err.Error()) + return } client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) require.NoError(t, err) @@ -303,7 +328,8 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) { require.NoError(t, err, "failed to create request to test handler") client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compression_config := newCompressionConfig(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compression_config) require.NoError(t, err) res, err := client.Do(req) require.NoError(t, err) @@ -323,7 +349,8 @@ func TestHTTPContentCompressionCopyError(t *testing.T) { require.NoError(t, err) client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compression_config := newCompressionConfig(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compression_config) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -347,7 +374,8 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) { require.NoError(t, err) client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compression_config := newCompressionConfig(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compression_config) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -448,7 +476,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { func compressGzip(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - gw := gzip.NewWriter(&buf) + gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) _, err := gw.Write(body) require.NoError(t, err) require.NoError(t, gw.Close()) @@ -457,7 +485,7 @@ func compressGzip(t testing.TB, body []byte) *bytes.Buffer { func compressZlib(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - zw := zlib.NewWriter(&buf) + zw, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed) _, err := zw.Write(body) require.NoError(t, err) require.NoError(t, zw.Close()) @@ -475,7 +503,9 @@ func compressSnappy(t testing.TB, body []byte) *bytes.Buffer { func compressZstd(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - zw, _ := zstd.NewWriter(&buf) + compression := zstd.SpeedFastest + encoderLevel := zstd.WithEncoderLevel(compression) + zw, _ := zstd.NewWriter(&buf, encoderLevel) _, err := zw.Write(body) require.NoError(t, err) require.NoError(t, zw.Close()) diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 5c8fefa92cc..078276e56c6 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -8,6 +8,7 @@ import ( "compress/gzip" "compress/zlib" "errors" + "fmt" "io" "sync" @@ -23,45 +24,72 @@ type writeCloserReset interface { Reset(w io.Writer) } -var ( - _ writeCloserReset = (*gzip.Writer)(nil) - gZipPool = &compressor{pool: sync.Pool{New: func() any { return gzip.NewWriter(nil) }}} - _ writeCloserReset = (*snappy.Writer)(nil) - snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}} - _ writeCloserReset = (*zstd.Encoder)(nil) - // Concurrency 1 disables async decoding via goroutines. This is useful to reduce memory usage and isn't a bottleneck for compression using sync.Pool. - zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}} - _ writeCloserReset = (*zlib.Writer)(nil) - zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}} - _ writeCloserReset = (*lz4.Writer)(nil) - lz4Pool = &compressor{pool: sync.Pool{New: func() any { - lz := lz4.NewWriter(nil) - // Setting concurrency to 1 to disable async decoding by goroutines. This will reduce the overall memory footprint and pool - _ = lz.Apply(lz4.ConcurrencyOption(1)) - return lz - }}} -) +type compressorMap struct { + pools map[string]*compressor +} type compressor struct { pool sync.Pool } +var ( + compressorPools = &compressorMap{pools: make(map[string]*compressor)} + snappyCompressor = &compressor{} + lz4Compressor = &compressor{} + _ writeCloserReset = (*gzip.Writer)(nil) + _ writeCloserReset = (*snappy.Writer)(nil) + _ writeCloserReset = (*zstd.Encoder)(nil) + _ writeCloserReset = (*zlib.Writer)(nil) + _ writeCloserReset = (*lz4.Writer)(nil) +) + // writerFactory defines writer field in CompressRoundTripper. // The validity of input is already checked when NewCompressRoundTripper was called in confighttp, -func newCompressor(compressionType configcompression.Type) (*compressor, error) { +func newCompressor(compressionType configcompression.Type, compressionConfig configcompression.CompressionConfig) (*compressor, error) { + mapKey := fmt.Sprintf("%s/%d", compressionType, compressionConfig.Level) switch compressionType { case configcompression.TypeGzip: - return gZipPool, nil + gZipCompressor, gzipExists := compressorPools.pools[mapKey] + if gzipExists { + return gZipCompressor, nil + } + gZipCompressor = &compressor{} + gZipCompressor.pool = sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, int(compressionConfig.Level)); return w }} + compressorPools.pools[mapKey] = gZipCompressor + return gZipCompressor, nil case configcompression.TypeSnappy: - return snappyPool, nil + if snappyCompressor.pool.Get() == nil { + snappyCompressor.pool = sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }} + return snappyCompressor, nil + } + return snappyCompressor, nil case configcompression.TypeZstd: - return zStdPool, nil + zstdCompressor, zstdExists := compressorPools.pools[mapKey] + compression := zstd.EncoderLevelFromZstd(int(compressionConfig.Level)) + encoderLevel := zstd.WithEncoderLevel(compression) + if zstdExists { + return zstdCompressor, nil + } + zstdCompressor = &compressor{} + zstdCompressor.pool = sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }} + return zstdCompressor, nil case configcompression.TypeZlib, configcompression.TypeDeflate: - return zLibPool, nil + zlibCompressor, zlibExists := compressorPools.pools[mapKey] + if zlibExists { + return zlibCompressor, nil + } + zlibCompressor = &compressor{} + zlibCompressor.pool = sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, int(compressionConfig.Level)); return w }} + compressorPools.pools[mapKey] = zlibCompressor + return zlibCompressor, nil case configcompression.TypeLz4: - return lz4Pool, nil + if lz4Compressor.pool.Get() == nil { + lz4Compressor.pool = sync.Pool{New: func() any { lz := lz4.NewWriter(nil); _ = lz.Apply(lz4.ConcurrencyOption(1)); return lz }} + return lz4Compressor, nil + } + return lz4Compressor, nil } - return nil, errors.New("unsupported compression type, ") + return nil, errors.New("unsupported compression type") } func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error { diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 69ac3900fe5..f56836bff71 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -71,6 +71,9 @@ type ClientConfig struct { // The compression key for supported compression types within collector. Compression configcompression.Type `mapstructure:"compression"` + // Advanced configuration options for the Compression + CompressionConfig configcompression.CompressionConfig `mapstructure:"compression_config"` + // MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open. // By default, it is set to 100. MaxIdleConns *int `mapstructure:"max_idle_conns"` @@ -219,7 +222,7 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett // Compress the body using specified compression methods if non-empty string is provided. // Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed. if hcs.Compression.IsCompressed() { - clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression) + clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression, hcs.CompressionConfig) if err != nil { return nil, err } From b78c72afff46ad0b7c31a3c3c362817ab540b2b7 Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Wed, 4 Dec 2024 18:14:11 -0500 Subject: [PATCH 2/3] Updated the Readme with compression_config --- config/confighttp/README.md | 29 ++++++++++++++++++++++++++- config/confighttp/compression_test.go | 5 +++-- exporter/otlpexporter/go.mod | 2 +- exporter/otlpexporter/go.sum | 4 ++-- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 160041d8a5c..8013ba84f66 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -26,6 +26,31 @@ README](../configtls/README.md). - `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, `deflate`, and `lz4`. - look at the documentation for the server-side of the communication. - `none` will be treated as uncompressed, and any other inputs will cause an error. +- `compression_config` : Configure advanced compression options + - `level`: Configure compression level for `compression` type + - The following are valid combinations of `compression` and `level` + - `gzip` + - NoCompression: `0` + - BestSpeed: `1` + - BestCompression: `9` + - DefaultCompression: `-1` + - `zlib` + - NoCompression: `0` + - BestSpeed: `1` + - BestCompression: `9` + - DefaultCompression: `-1` + - `deflate` + - NoCompression: `0` + - BestSpeed: `1` + - BestCompression: `9` + - DefaultCompression: `-1` + - `zstd` + - SpeedFastest: `1` + - SpeedDefault: `3` + - SpeedBetterCompression: `6` + - SpeedBestCompression: `11` + - `snappy` + No compression levels supported yet - [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport) - [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport) - [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport) @@ -52,7 +77,9 @@ exporter: headers: test1: "value1" "test 2": "value 2" - compression: zstd + compression: gzip + compression_config: + level: 1 cookies: enabled: true ``` diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 05f8ffe411b..5b4314d7239 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -119,7 +119,9 @@ func TestHTTPClientCompression(t *testing.T) { req, err := http.NewRequest(http.MethodGet, srv.URL, reqBody) require.NoError(t, err, "failed to create request to test handler") - compressionType := configcompression.Type(tt.encoding) + compressionType := tt.encoding + err = compressionType.UnmarshalText([]byte(tt.encoding)) + require.NoError(t, err) compression_config := newCompressionConfig(tt.enclevel) err = compression_config.Validate() clientSettings := ClientConfig{ @@ -127,7 +129,6 @@ func TestHTTPClientCompression(t *testing.T) { Compression: tt.encoding, CompressionConfig: newCompressionConfig(tt.enclevel), } - compressionType.UnmarshalText([]byte(tt.encoding)) if tt.shouldError { assert.Error(t, err) message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.enclevel) diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 1a759cf5f28..888d1970382 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -42,7 +42,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index f25ea25d5a9..5359504ba4d 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -27,8 +27,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= From ad9a8a38676e11e45da820540d3cf9cb758f6a8c Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Thu, 5 Dec 2024 11:34:11 -0500 Subject: [PATCH 3/3] Added ClientConfig validation and moved compression validation to ClientConfig --- config/configcompression/compressiontype.go | 43 +++---------------- .../configcompression/compressiontype_test.go | 2 +- config/configcompression/go.mod | 1 - config/configcompression/go.sum | 2 - config/confighttp/README.md | 4 +- config/confighttp/compression.go | 12 +++--- config/confighttp/compression_test.go | 21 +++++---- config/confighttp/compressor.go | 10 ++--- config/confighttp/confighttp.go | 32 +++++++++++++- 9 files changed, 61 insertions(+), 66 deletions(-) diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index 7133f300424..63d1c2f37ef 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -5,8 +5,6 @@ package configcompression // import "go.opentelemetry.io/collector/config/config import ( "fmt" - - "github.com/klauspost/compress/zlib" ) // Type represents a compression method @@ -14,7 +12,7 @@ type Type string type Level int -type CompressionConfig struct { +type CompressionParams struct { Level Level `mapstructure:"level"` } @@ -25,56 +23,29 @@ const ( TypeSnappy Type = "snappy" TypeZstd Type = "zstd" TypeLz4 Type = "lz4" - typeNone Type = "none" - typeEmpty Type = "" + TypeNone Type = "none" + TypeEmpty Type = "" LevelNone Level = 0 ) -var typ Type - // IsCompressed returns false if CompressionType is nil, none, or empty. // Otherwise, returns true. func (ct *Type) IsCompressed() bool { - return *ct != typeEmpty && *ct != typeNone + return *ct != TypeEmpty && *ct != TypeNone } func (ct *Type) UnmarshalText(in []byte) error { - typ = Type(in) + typ := Type(in) if typ == TypeGzip || typ == TypeZlib || typ == TypeDeflate || typ == TypeSnappy || typ == TypeZstd || typ == TypeLz4 || - typ == typeNone || - typ == typeEmpty { + typ == TypeNone || + typ == TypeEmpty { *ct = typ return nil } return fmt.Errorf("unsupported compression type %q", typ) } - -func (cc *CompressionConfig) Validate() error { - if (typ == TypeGzip && isValidLevel(int(cc.Level))) || - (typ == TypeZlib && isValidLevel(int(cc.Level))) || - (typ == TypeDeflate && isValidLevel(int(cc.Level))) || - typ == TypeSnappy || - typ == TypeLz4 || - typ == TypeZstd || - typ == typeNone || - typ == typeEmpty { - return nil - } - - return fmt.Errorf("unsupported compression type and level %s - %d", typ, cc.Level) -} - -// Checks the validity of zlib/gzip/flate compression levels -func isValidLevel(level int) bool { - return level == zlib.DefaultCompression || - level == int(LevelNone) || - level == zlib.HuffmanOnly || - level == zlib.NoCompression || - level == zlib.BestSpeed || - (level >= zlib.BestSpeed && level <= zlib.BestCompression) -} diff --git a/config/configcompression/compressiontype_test.go b/config/configcompression/compressiontype_test.go index 5fb78c054bc..dd7dea64d4e 100644 --- a/config/configcompression/compressiontype_test.go +++ b/config/configcompression/compressiontype_test.go @@ -71,7 +71,7 @@ func TestUnmarshalText(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - temp := typeNone + temp := TypeNone err := temp.UnmarshalText(tt.compressionName) if tt.shouldError { assert.Error(t, err) diff --git a/config/configcompression/go.mod b/config/configcompression/go.mod index 9461651915e..f2be0eee989 100644 --- a/config/configcompression/go.mod +++ b/config/configcompression/go.mod @@ -3,7 +3,6 @@ module go.opentelemetry.io/collector/config/configcompression go 1.22.0 require ( - github.com/klauspost/compress v1.17.11 github.com/stretchr/testify v1.10.0 go.uber.org/goleak v1.3.0 ) diff --git a/config/configcompression/go.sum b/config/configcompression/go.sum index dd47f9dab52..4cd47abd5b6 100644 --- a/config/configcompression/go.sum +++ b/config/configcompression/go.sum @@ -1,8 +1,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 8013ba84f66..cf8df1768de 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -26,7 +26,7 @@ README](../configtls/README.md). - `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, `deflate`, and `lz4`. - look at the documentation for the server-side of the communication. - `none` will be treated as uncompressed, and any other inputs will cause an error. -- `compression_config` : Configure advanced compression options +- `compression_params` : Configure advanced compression options - `level`: Configure compression level for `compression` type - The following are valid combinations of `compression` and `level` - `gzip` @@ -78,7 +78,7 @@ exporter: test1: "value1" "test 2": "value 2" compression: gzip - compression_config: + compression_params: level: 1 cookies: enabled: true diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 13bc505694f..6abcef926f5 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -23,7 +23,7 @@ import ( type compressRoundTripper struct { rt http.RoundTripper compressionType configcompression.Type - compressionConfig configcompression.CompressionConfig + CompressionParams configcompression.CompressionParams compressor *compressor } @@ -77,21 +77,21 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro }, } -func newCompressionConfig(level configcompression.Level) configcompression.CompressionConfig { - return configcompression.CompressionConfig{ +func newCompressionParams(level configcompression.Level) configcompression.CompressionParams { + return configcompression.CompressionParams{ Level: level, } } -func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type, compressionConfig configcompression.CompressionConfig) (*compressRoundTripper, error) { - encoder, err := newCompressor(compressionType, compressionConfig) +func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressRoundTripper, error) { + encoder, err := newCompressor(compressionType, compressionParams) if err != nil { return nil, err } return &compressRoundTripper{ rt: rt, compressionType: compressionType, - compressionConfig: compressionConfig, + CompressionParams: compressionParams, compressor: encoder, }, nil } diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 5b4314d7239..6b78c6db1c0 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -122,15 +122,14 @@ func TestHTTPClientCompression(t *testing.T) { compressionType := tt.encoding err = compressionType.UnmarshalText([]byte(tt.encoding)) require.NoError(t, err) - compression_config := newCompressionConfig(tt.enclevel) - err = compression_config.Validate() clientSettings := ClientConfig{ Endpoint: srv.URL, Compression: tt.encoding, - CompressionConfig: newCompressionConfig(tt.enclevel), + CompressionParams: newCompressionParams(tt.enclevel), } + err = clientSettings.Validate() if tt.shouldError { - assert.Error(t, err) + require.Error(t, err) message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.enclevel) assert.Equal(t, message, err.Error()) return @@ -139,7 +138,7 @@ func TestHTTPClientCompression(t *testing.T) { require.NoError(t, err) res, err := client.Do(req) if tt.shouldError { - assert.Error(t, err) + require.Error(t, err) return } require.NoError(t, err) @@ -329,8 +328,8 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) { require.NoError(t, err, "failed to create request to test handler") client := srv.Client() - compression_config := newCompressionConfig(gzip.BestSpeed) - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compression_config) + compressionParams := newCompressionParams(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams) require.NoError(t, err) res, err := client.Do(req) require.NoError(t, err) @@ -350,8 +349,8 @@ func TestHTTPContentCompressionCopyError(t *testing.T) { require.NoError(t, err) client := srv.Client() - compression_config := newCompressionConfig(gzip.BestSpeed) - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compression_config) + compressionParams := newCompressionParams(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -375,8 +374,8 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) { require.NoError(t, err) client := srv.Client() - compression_config := newCompressionConfig(gzip.BestSpeed) - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compression_config) + compressionParams := newCompressionParams(gzip.BestSpeed) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 078276e56c6..917f1db4fa5 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -45,8 +45,8 @@ var ( // writerFactory defines writer field in CompressRoundTripper. // The validity of input is already checked when NewCompressRoundTripper was called in confighttp, -func newCompressor(compressionType configcompression.Type, compressionConfig configcompression.CompressionConfig) (*compressor, error) { - mapKey := fmt.Sprintf("%s/%d", compressionType, compressionConfig.Level) +func newCompressor(compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressor, error) { + mapKey := fmt.Sprintf("%s/%d", compressionType, compressionParams.Level) switch compressionType { case configcompression.TypeGzip: gZipCompressor, gzipExists := compressorPools.pools[mapKey] @@ -54,7 +54,7 @@ func newCompressor(compressionType configcompression.Type, compressionConfig con return gZipCompressor, nil } gZipCompressor = &compressor{} - gZipCompressor.pool = sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, int(compressionConfig.Level)); return w }} + gZipCompressor.pool = sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)); return w }} compressorPools.pools[mapKey] = gZipCompressor return gZipCompressor, nil case configcompression.TypeSnappy: @@ -65,7 +65,7 @@ func newCompressor(compressionType configcompression.Type, compressionConfig con return snappyCompressor, nil case configcompression.TypeZstd: zstdCompressor, zstdExists := compressorPools.pools[mapKey] - compression := zstd.EncoderLevelFromZstd(int(compressionConfig.Level)) + compression := zstd.EncoderLevelFromZstd(int(compressionParams.Level)) encoderLevel := zstd.WithEncoderLevel(compression) if zstdExists { return zstdCompressor, nil @@ -79,7 +79,7 @@ func newCompressor(compressionType configcompression.Type, compressionConfig con return zlibCompressor, nil } zlibCompressor = &compressor{} - zlibCompressor.pool = sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, int(compressionConfig.Level)); return w }} + zlibCompressor.pool = sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, int(compressionParams.Level)); return w }} compressorPools.pools[mapKey] = zlibCompressor return zlibCompressor, nil case configcompression.TypeLz4: diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index f56836bff71..d2f37f69c17 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -4,6 +4,7 @@ package confighttp // import "go.opentelemetry.io/collector/config/confighttp" import ( + "compress/zlib" "context" "crypto/tls" "errors" @@ -72,7 +73,7 @@ type ClientConfig struct { Compression configcompression.Type `mapstructure:"compression"` // Advanced configuration options for the Compression - CompressionConfig configcompression.CompressionConfig `mapstructure:"compression_config"` + CompressionParams configcompression.CompressionParams `mapstructure:"compression_params"` // MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open. // By default, it is set to 100. @@ -137,6 +138,33 @@ func NewDefaultClientConfig() ClientConfig { } } +// Checks the validity of zlib/gzip/flate compression levels +func isValidLevel(level configcompression.Level) bool { + return level == zlib.DefaultCompression || + level == configcompression.LevelNone || + level == zlib.HuffmanOnly || + level == zlib.NoCompression || + level == zlib.BestSpeed || + (level >= zlib.BestSpeed && level <= zlib.BestCompression) +} + +func (hcs *ClientConfig) Validate() error { + if hcs.Compression.IsCompressed() { + if (hcs.Compression == configcompression.TypeGzip && isValidLevel(hcs.CompressionParams.Level)) || + (hcs.Compression == configcompression.TypeZlib && isValidLevel(hcs.CompressionParams.Level)) || + (hcs.Compression == configcompression.TypeDeflate && isValidLevel(hcs.CompressionParams.Level)) || + hcs.Compression == configcompression.TypeSnappy || + hcs.Compression == configcompression.TypeLz4 || + hcs.Compression == configcompression.TypeZstd || + hcs.Compression == configcompression.TypeNone || + hcs.Compression == configcompression.TypeEmpty { + return nil + } + return fmt.Errorf("unsupported compression type and level %s - %d", hcs.Compression, hcs.CompressionParams.Level) + } + return nil +} + // ToClient creates an HTTP client. func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, settings component.TelemetrySettings) (*http.Client, error) { tlsCfg, err := hcs.TLSSetting.LoadTLSConfig(ctx) @@ -222,7 +250,7 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett // Compress the body using specified compression methods if non-empty string is provided. // Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed. if hcs.Compression.IsCompressed() { - clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression, hcs.CompressionConfig) + clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression, hcs.CompressionParams) if err != nil { return nil, err }