Skip to content

Commit

Permalink
[exporter/awss3] use encoding extensions (#31801)
Browse files Browse the repository at this point in the history
**Description:**
Add encoding extension support to AWS S3 exporter.

**Link to tracking Issue:**
Fixes #30554

**Testing:**
Unit tests.

**Documentation:**
README.
  • Loading branch information
atoulme authored Mar 18, 2024
1 parent ce10c39 commit cbf003e
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 31 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_encoding_s3exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for encoding extension to awss3exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30554]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 7 additions & 0 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The following exporter configuration parameters are supported.
| `role_arn` | the Role ARN to be assumed | |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data | `otlp_json` |
| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | |
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
Expand All @@ -47,6 +48,12 @@ Marshaler determines the format of data sent to AWS S3. Currently, the following
- `body`: export the log body as string.
**This format is supported only for logs.**

### Encoding

Encoding overrides marshaler if present and sets to use an encoding extension defined in the collector configuration.

See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding.

### Compression
- `none` (default): No compression will be applied
- `gzip`: Files will be compressed with gzip. **This does not support `sumo_ic`marshaler.**
Expand Down
3 changes: 3 additions & 0 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect
import (
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.uber.org/multierr"
)
Expand Down Expand Up @@ -40,6 +41,8 @@ type Config struct {
MarshalerName MarshalerType `mapstructure:"marshaler"`

FileFormat string `mapstructure:"file_format"`
// Encoding to apply. If present, overrides the marshaler configuration option.
Encoding *component.ID `mapstructure:"encoding"`
}

func (c *Config) Validate() error {
Expand Down
2 changes: 2 additions & 0 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
encoding := component.MustNewIDWithName("foo", "bar")
assert.Equal(t, e,
&Config{
Encoding: &encoding,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
Expand Down
39 changes: 23 additions & 16 deletions exporter/awss3exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,26 +24,32 @@ type s3Exporter struct {
}

func newS3Exporter(config *Config,
params exporter.CreateSettings) (*s3Exporter, error) {

if config == nil {
return nil, errors.New("s3 exporter config is nil")
}

logger := params.Logger

m, err := newMarshaler(config.MarshalerName, logger)
if err != nil {
return nil, errors.New("unknown marshaler")
}
params exporter.CreateSettings) *s3Exporter {

s3Exporter := &s3Exporter{
config: config,
dataWriter: &s3Writer{},
logger: logger,
marshaler: m,
logger: params.Logger,
}
return s3Exporter, nil
return s3Exporter
}

func (e *s3Exporter) start(_ context.Context, host component.Host) error {

var m marshaler
var err error
if e.config.Encoding != nil {
if m, err = newMarshalerFromEncoding(e.config.Encoding, host, e.logger); err != nil {
return err
}
} else {
if m, err = newMarshaler(e.config.MarshalerName, e.logger); err != nil {
return fmt.Errorf("unknown marshaler %q", e.config.MarshalerName)
}
}

e.marshaler = m
return nil
}

func (e *s3Exporter) Capabilities() consumer.Capabilities {
Expand Down
24 changes: 9 additions & 15 deletions exporter/awss3exporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,35 @@ func createLogsExporter(ctx context.Context,
params exporter.CreateSettings,
config component.Config) (exporter.Logs, error) {

s3Exporter, err := newS3Exporter(config.(*Config), params)
if err != nil {
return nil, err
}
s3Exporter := newS3Exporter(config.(*Config), params)

return exporterhelper.NewLogsExporter(ctx, params,
config,
s3Exporter.ConsumeLogs)
s3Exporter.ConsumeLogs,
exporterhelper.WithStart(s3Exporter.start))
}

func createMetricsExporter(ctx context.Context,
params exporter.CreateSettings,
config component.Config) (exporter.Metrics, error) {

s3Exporter, err := newS3Exporter(config.(*Config), params)
if err != nil {
return nil, err
}
s3Exporter := newS3Exporter(config.(*Config), params)

if config.(*Config).MarshalerName == SumoIC {
return nil, fmt.Errorf("metrics are not supported by sumo_ic output format")
}

return exporterhelper.NewMetricsExporter(ctx, params,
config,
s3Exporter.ConsumeMetrics)
s3Exporter.ConsumeMetrics,
exporterhelper.WithStart(s3Exporter.start))
}

func createTracesExporter(ctx context.Context,
params exporter.CreateSettings,
config component.Config) (exporter.Traces, error) {

s3Exporter, err := newS3Exporter(config.(*Config), params)
if err != nil {
return nil, err
}
s3Exporter := newS3Exporter(config.(*Config), params)

if config.(*Config).MarshalerName == SumoIC {
return nil, fmt.Errorf("traces are not supported by sumo_ic output format")
Expand All @@ -83,5 +76,6 @@ func createTracesExporter(ctx context.Context,
return exporterhelper.NewTracesExporter(ctx,
params,
config,
s3Exporter.ConsumeTraces)
s3Exporter.ConsumeTraces,
exporterhelper.WithStart(s3Exporter.start))
}
16 changes: 16 additions & 0 deletions exporter/awss3exporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -23,6 +25,20 @@ var (
ErrUnknownMarshaler = errors.New("unknown marshaler")
)

func newMarshalerFromEncoding(encoding *component.ID, host component.Host, logger *zap.Logger) (marshaler, error) {
marshaler := &s3Marshaler{logger: logger}
e, ok := host.GetExtensions()[*encoding]
if !ok {
return nil, fmt.Errorf("unknown encoding %q", encoding)
}
// cast with ok to avoid panics.
marshaler.logsMarshaler, _ = e.(plog.Marshaler)
marshaler.metricsMarshaler, _ = e.(pmetric.Marshaler)
marshaler.tracesMarshaler, _ = e.(ptrace.Marshaler)
marshaler.fileFormat = encoding.String()
return marshaler, nil
}

func newMarshaler(mType MarshalerType, logger *zap.Logger) (marshaler, error) {
marshaler := &s3Marshaler{logger: logger}
switch mType {
Expand Down
59 changes: 59 additions & 0 deletions exporter/awss3exporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package awss3exporter

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -42,3 +45,59 @@ func TestMarshaler(t *testing.T) {
assert.Equal(t, m.format(), "txt")
}
}

type hostWithExtensions struct {
encoding encodingExtension
}

func (h hostWithExtensions) Start(context.Context, component.Host) error {
panic("unsupported")
}

func (h hostWithExtensions) Shutdown(context.Context) error {
panic("unsupported")
}

func (h hostWithExtensions) GetFactory(component.Kind, component.Type) component.Factory {
panic("unsupported")
}

func (h hostWithExtensions) GetExtensions() map[component.ID]component.Component {
return map[component.ID]component.Component{
component.MustNewID("foo"): h.encoding,
}
}

func (h hostWithExtensions) GetExporters() map[component.DataType]map[component.ID]component.Component {
panic("unsupported")
}

type encodingExtension struct {
}

func (e encodingExtension) Start(_ context.Context, _ component.Host) error {
panic("unsupported")
}

func (e encodingExtension) Shutdown(_ context.Context) error {
panic("unsupported")
}

func TestMarshalerFromEncoding(t *testing.T) {
id := component.MustNewID("foo")

{
host := hostWithExtensions{
encoding: encodingExtension{},
}
m, err := newMarshalerFromEncoding(&id, host, zap.NewNop())
assert.NoError(t, err)
require.NotNil(t, m)
assert.Equal(t, "foo", m.format())
}
{
m, err := newMarshalerFromEncoding(&id, componenttest.NewNopHost(), zap.NewNop())
assert.EqualError(t, err, `unknown encoding "foo"`)
require.Nil(t, m)
}
}
1 change: 1 addition & 0 deletions exporter/awss3exporter/testdata/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ receivers:

exporters:
awss3:
encoding: "foo/bar"
s3uploader:
s3_bucket: "foo"
region: 'us-east-1'
Expand Down

0 comments on commit cbf003e

Please sign in to comment.