Skip to content

Commit

Permalink
Add zstd compression to SAPM receiver and exporter (#23257)
Browse files Browse the repository at this point in the history
- Updated github.com/signalfx/sapm-proto to v0.13.0
- Added "compression" config setting to sapm exporter
- Added tests to verify various compression settings for sapm receiver
and exporter.
  • Loading branch information
tigrannajaryan authored Jun 15, 2023
1 parent c2658a7 commit ae15d8c
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 36 deletions.
20 changes: 20 additions & 0 deletions .chloggen/sapm-exporter-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sapmexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: sapm exporter now supports `compression` config option to specify either gzip or zstd compression to use.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23257]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
20 changes: 20 additions & 0 deletions .chloggen/sapm-receiver-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sapmreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: sapm receiver now accepts requests in compressed with zstd.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23257]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
5 changes: 5 additions & 0 deletions exporter/sapmexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ during final translation. Intended to be used in tandem with identical configur
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `log_detailed_response` (default = `false`): Option to log detailed response from Splunk APM.
In addition to setting this option to `true`, debug logging at the Collector level needs to be enabled.
- `compression`: Compression method to use for outgoing SAPM requests. Can be one of
"gzip", "zstd" or be unspecified. If unspecified then "gzip" compression is used unless
`disable_compression` option is set to true.
- `disable_compression` (default = `false`): If set to true the outgoing requests are not
compressed and `compression` option is ignored.

In addition, this exporter offers queued retry which is enabled by default.
Information about queued retry configuration parameters can be found
Expand Down
22 changes: 21 additions & 1 deletion exporter/sapmexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sapmexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"errors"
"fmt"
"net/url"

sapmclient "github.com/signalfx/sapm-proto/client"
Expand Down Expand Up @@ -35,9 +36,13 @@ type Config struct {
// MaxConnections is used to set a limit to the maximum idle HTTP connection the exporter can keep open.
MaxConnections uint `mapstructure:"max_connections"`

// Disable GZip compression.
// Disable compression. If set to true then Compression field is ignored.
DisableCompression bool `mapstructure:"disable_compression"`

// Compression method to use (gzip or zstd). Ignored if DisableCompression=true.
// If unspecified defaults to gzip.
Compression string `mapstructure:"compression"`

// Log detailed response from trace ingest.
LogDetailedResponse bool `mapstructure:"log_detailed_response"`

Expand All @@ -56,6 +61,17 @@ func (c *Config) Validate() error {
if err != nil {
return err
}

switch c.Compression {
// Valid compression methods.
case "", // no compression
string(sapmclient.CompressionMethodGzip),
string(sapmclient.CompressionMethodZstd):

default:
return fmt.Errorf("invalid compression %q", c.Compression)
}

return nil
}

Expand Down Expand Up @@ -85,5 +101,9 @@ func (c *Config) clientOptions() []sapmclient.Option {
opts = append(opts, sapmclient.WithDisabledCompression())
}

if c.Compression != "" {
opts = append(opts, sapmclient.WithCompressionMethod(sapmclient.CompressionMethod(c.Compression)))
}

return opts
}
6 changes: 6 additions & 0 deletions exporter/sapmexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func TestInvalidConfig(t *testing.T) {
invalidURLErr := invalid.Validate()
require.Error(t, invalidURLErr)

invalid = Config{
Endpoint: "http://localhost",
Compression: "nosuchcompression",
}
assert.Error(t, invalid.Validate())

invalid = Config{
Endpoint: "abcd1234",
QueueSettings: exporterhelper.QueueSettings{
Expand Down
116 changes: 116 additions & 0 deletions exporter/sapmexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
package sapmexporter

import (
"compress/gzip"
"context"
"crypto/rand"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/jaegertracing/jaeger/model"
"github.com/klauspost/compress/zstd"
splunksapm "github.com/signalfx/sapm-proto/gen"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -210,3 +214,115 @@ func TestSAPMClientTokenUsageAndErrorMarshalling(t *testing.T) {
})
}
}

func decompress(body io.Reader, compression string) ([]byte, error) {
switch compression {
case "":
return io.ReadAll(body)
case "gzip":
reader, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
case "zstd":
reader, err := zstd.NewReader(body)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
}
return nil, fmt.Errorf("unknown compression %q", compression)
}

func TestCompression(t *testing.T) {
tests := []struct {
name string
configDisableCompression bool
configCompression string
receivedCompression string
}{
{
name: "unspecified config",
configCompression: "",
configDisableCompression: false,
receivedCompression: "gzip",
},
{
name: "gzip",
configCompression: "gzip",
configDisableCompression: false,
receivedCompression: "gzip",
},
{
name: "zstd",
configCompression: "zstd",
configDisableCompression: false,
receivedCompression: "zstd",
},
{
name: "disable compression and unspecified method",
configDisableCompression: true,
configCompression: "",
receivedCompression: "",
},
{
name: "disable compression and specify gzip",
configDisableCompression: true,
configCompression: "gzip",
receivedCompression: "",
},
{
name: "disable compression and specify zstd",
configDisableCompression: true,
configCompression: "zstd",
receivedCompression: "",
},
}
for _, tt := range tests {
tt := tt
t.Run(
tt.name, func(t *testing.T) {
tracesReceived := false
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
compression := r.Header.Get("Content-Encoding")
assert.EqualValues(t, compression, tt.receivedCompression)

payload, err := decompress(r.Body, compression)
require.NoError(t, err)

var sapm splunksapm.PostSpansRequest
err = sapm.Unmarshal(payload)
require.NoError(t, err)

w.WriteHeader(200)
tracesReceived = true
},
),
)
defer func() {
assert.True(t, tracesReceived, "Test server never received traces.")
}()
defer server.Close()

cfg := &Config{
Endpoint: server.URL,
DisableCompression: tt.configDisableCompression,
Compression: tt.configCompression,
}
params := exportertest.NewNopCreateSettings()

se, err := newSAPMExporter(cfg, params)
assert.Nil(t, err)
assert.NotNil(t, se, "failed to create trace exporter")

trace, testTraceErr := buildTestTrace()
require.NoError(t, testTraceErr)
err = se.pushTraceData(context.Background(), trace)
require.NoError(t, err)
},
)
}
}
1 change: 1 addition & 0 deletions exporter/sapmexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/jaegertracing/jaeger v1.41.0
github.com/klauspost/compress v1.16.5
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0
Expand Down
2 changes: 1 addition & 1 deletion receiver/sapmreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/gorilla/mux v1.8.0
github.com/jaegertracing/jaeger v1.41.0
github.com/klauspost/compress v1.16.5
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0
Expand All @@ -31,7 +32,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
Expand Down
Loading

0 comments on commit ae15d8c

Please sign in to comment.