Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for configuring compression levels #11805

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/compression-config-option2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not breaking anymore


# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added support for configuring compression levels.

# One or more tracking issues or pull requests related to the change
issues: [10467]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more details here describing a new field


# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
33 changes: 21 additions & 12 deletions config/configcompression/compressiontype.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,35 @@

package configcompression // import "go.opentelemetry.io/collector/config/configcompression"

import "fmt"
import (
"fmt"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit please avoid unrelated changes


// Type represents a compression method
type Type string

type Level int

type CompressionParams struct {
Level Level `mapstructure:"level"`
}

const (
TypeGzip Type = "gzip"
TypeZlib Type = "zlib"
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
typeNone Type = "none"
typeEmpty Type = ""
TypeGzip Type = "gzip"
TypeZlib Type = "zlib"
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
TypeNone Type = "none"
TypeEmpty Type = ""
LevelNone Level = 0
)

// IsCompressed returns false if CompressionType is nil, none, or empty.
// Otherwise, returns true.
func (ct *Type) IsCompressed() bool {
return *ct != typeEmpty && *ct != typeNone
return *ct != TypeEmpty && *ct != TypeNone
}

func (ct *Type) UnmarshalText(in []byte) error {
Expand All @@ -33,8 +42,8 @@ func (ct *Type) UnmarshalText(in []byte) error {
typ == TypeSnappy ||
typ == TypeZstd ||
typ == TypeLz4 ||
typ == typeNone ||
typ == typeEmpty {
typ == TypeNone ||
typ == TypeEmpty {
*ct = typ
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion config/configcompression/compressiontype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestUnmarshalText(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
temp := typeNone
temp := TypeNone
err := temp.UnmarshalText(tt.compressionName)
if tt.shouldError {
assert.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/compress v1.17.11 // indirect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change? It doesn't require new version right?

github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions config/configgrpc/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,31 @@ README](../configtls/README.md).
- `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, `deflate`, and `lz4`.
- look at the documentation for the server-side of the communication.
- `none` will be treated as uncompressed, and any other inputs will cause an error.
- `compression_params` : Configure advanced compression options
- `level`: Configure compression level for `compression` type
- The following are valid combinations of `compression` and `level`
- `gzip`
- NoCompression: `0`
- BestSpeed: `1`
- BestCompression: `9`
- DefaultCompression: `-1`
- `zlib`
- NoCompression: `0`
- BestSpeed: `1`
- BestCompression: `9`
- DefaultCompression: `-1`
- `deflate`
- NoCompression: `0`
- BestSpeed: `1`
- BestCompression: `9`
- DefaultCompression: `-1`
- `zstd`
- SpeedFastest: `1`
- SpeedDefault: `3`
- SpeedBetterCompression: `6`
- SpeedBestCompression: `11`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that zstd has more levels? I see we don't validate that

- `snappy`
No compression levels supported yet
- [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport)
- [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
- [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
Expand All @@ -52,7 +77,9 @@ exporter:
headers:
test1: "value1"
"test 2": "value 2"
compression: zstd
compression: gzip
compression_params:
level: 1
cookies:
enabled: true
```
Expand Down
24 changes: 16 additions & 8 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
)

type compressRoundTripper struct {
rt http.RoundTripper
compressionType configcompression.Type
compressor *compressor
rt http.RoundTripper
compressionType configcompression.Type
CompressionParams configcompression.CompressionParams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent

Suggested change
CompressionParams configcompression.CompressionParams
compressionParams configcompression.CompressionParams

compressor *compressor
}

var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){
Expand Down Expand Up @@ -76,15 +77,22 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
},
}

func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) {
encoder, err := newCompressor(compressionType)
func newCompressionParams(level configcompression.Level) configcompression.CompressionParams {
return configcompression.CompressionParams{
Level: level,
}
}

func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressRoundTripper, error) {
encoder, err := newCompressor(compressionType, compressionParams)
if err != nil {
return nil, err
}
return &compressRoundTripper{
rt: rt,
compressionType: compressionType,
compressor: encoder,
rt: rt,
compressionType: compressionType,
CompressionParams: compressionParams,
compressor: encoder,
}, nil
}

Expand Down
50 changes: 40 additions & 10 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/zlib"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -35,9 +36,12 @@ func TestHTTPClientCompression(t *testing.T) {
compressedZstdBody := compressZstd(t, testBody)
compressedLz4Body := compressLz4(t, testBody)

const invalidGzipLevel configcompression.Level = 100

tests := []struct {
name string
encoding configcompression.Type
enclevel configcompression.Level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shoud be level or encLevel

reqBody []byte
shouldError bool
}{
Expand All @@ -56,18 +60,28 @@ func TestHTTPClientCompression(t *testing.T) {
{
name: "ValidGzip",
encoding: configcompression.TypeGzip,
enclevel: gzip.BestSpeed,
reqBody: compressedGzipBody.Bytes(),
shouldError: false,
},
{
name: "InvalidGzip",
encoding: configcompression.TypeGzip,
enclevel: invalidGzipLevel,
reqBody: compressedGzipBody.Bytes(),
shouldError: true,
},
{
name: "ValidZlib",
encoding: configcompression.TypeZlib,
enclevel: gzip.BestSpeed,
reqBody: compressedZlibBody.Bytes(),
shouldError: false,
},
{
name: "ValidDeflate",
encoding: configcompression.TypeDeflate,
enclevel: gzip.BestSpeed,
reqBody: compressedDeflateBody.Bytes(),
shouldError: false,
},
Expand All @@ -80,6 +94,7 @@ func TestHTTPClientCompression(t *testing.T) {
{
name: "ValidZstd",
encoding: configcompression.TypeZstd,
enclevel: gzip.BestSpeed,
reqBody: compressedZstdBody.Bytes(),
shouldError: false,
},
Expand All @@ -104,16 +119,26 @@ func TestHTTPClientCompression(t *testing.T) {

req, err := http.NewRequest(http.MethodGet, srv.URL, reqBody)
require.NoError(t, err, "failed to create request to test handler")

compressionType := tt.encoding
err = compressionType.UnmarshalText([]byte(tt.encoding))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need UnmarshalText?

require.NoError(t, err)
clientSettings := ClientConfig{
Endpoint: srv.URL,
Compression: tt.encoding,
Endpoint: srv.URL,
Compression: tt.encoding,
CompressionParams: newCompressionParams(tt.enclevel),
}
err = clientSettings.Validate()
if tt.shouldError {
require.Error(t, err)
message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.enclevel)
assert.Equal(t, message, err.Error())
return
}
client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
res, err := client.Do(req)
if tt.shouldError {
assert.Error(t, err)
require.Error(t, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be changed

return
}
require.NoError(t, err)
Expand Down Expand Up @@ -303,7 +328,8 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) {
require.NoError(t, err, "failed to create request to test handler")

client := srv.Client()
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip)
compressionParams := newCompressionParams(gzip.BestSpeed)
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams)
require.NoError(t, err)
res, err := client.Do(req)
require.NoError(t, err)
Expand All @@ -323,7 +349,8 @@ func TestHTTPContentCompressionCopyError(t *testing.T) {
require.NoError(t, err)

client := srv.Client()
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip)
compressionParams := newCompressionParams(gzip.BestSpeed)
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams)
require.NoError(t, err)
_, err = client.Do(req)
require.Error(t, err)
Expand All @@ -347,7 +374,8 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) {
require.NoError(t, err)

client := srv.Client()
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip)
compressionParams := newCompressionParams(gzip.BestSpeed)
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip, compressionParams)
require.NoError(t, err)
_, err = client.Do(req)
require.Error(t, err)
Expand Down Expand Up @@ -448,7 +476,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) {

func compressGzip(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
_, err := gw.Write(body)
require.NoError(t, err)
require.NoError(t, gw.Close())
Expand All @@ -457,7 +485,7 @@ func compressGzip(t testing.TB, body []byte) *bytes.Buffer {

func compressZlib(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
zw := zlib.NewWriter(&buf)
zw, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed)
_, err := zw.Write(body)
require.NoError(t, err)
require.NoError(t, zw.Close())
Expand All @@ -475,7 +503,9 @@ func compressSnappy(t testing.TB, body []byte) *bytes.Buffer {

func compressZstd(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
zw, _ := zstd.NewWriter(&buf)
compression := zstd.SpeedFastest
encoderLevel := zstd.WithEncoderLevel(compression)
zw, _ := zstd.NewWriter(&buf, encoderLevel)
_, err := zw.Write(body)
require.NoError(t, err)
require.NoError(t, zw.Close())
Expand Down
Loading
Loading