From 2c33fbb6a03d8849e30a220f756693e48892b5aa Mon Sep 17 00:00:00 2001 From: Iris Grace Endozo Date: Mon, 19 Oct 2020 10:00:15 +1100 Subject: [PATCH] Modify kinesisexporter to support metrics/traces to otlp_proto encoding (#9) * Add OTLP marshalling support for kinesis exporter (#1) Adding a Marshaller interface that allows encoding to different export formats Add an OTLP marshaller with support for metrics Co-authored-by: Iris Grace Endozo * Swapped kinesis opencensus producer with omnition producer * Moved AWS/Kinesis setup to exporter.go * Delete unused Message struct (#3) Co-authored-by: Iris Grace Endozo * Integrated marshaller into trace path * Adding tests for exporter and trace marshaling * Support sending out metrics to kinesis (#5) Co-authored-by: Iris Grace Endozo * Added Start function to factory and logging of producer failures * Removed var * Changed order of config variables * Increase test coverage + some cleanups (#8) * Add tests + private methods * Add comments + fixes Co-authored-by: Iris Grace Endozo * Add README.mdgst * Added parallel testing and import order fixes * Added pointer receivers * Opentelemetry to OpenTelemetry * Unused imports * Add license and fix import order * Fixed license * Fix linting shadowing error * Change readme descriptions * Use sensible values in example config * Added context validation in exporter * Move invalid context as const Co-authored-by: Iris Grace Endozo Co-authored-by: Raymond Wang --- exporter/kinesisexporter/README.md | 47 ++++++- exporter/kinesisexporter/config.go | 6 +- exporter/kinesisexporter/config_test.go | 21 +-- exporter/kinesisexporter/exporter.go | 107 +++++++++----- exporter/kinesisexporter/exporter_test.go | 131 ++++++++++++++++++ exporter/kinesisexporter/factory.go | 66 +++++---- exporter/kinesisexporter/factory_test.go | 67 +++++++++ exporter/kinesisexporter/go.mod | 7 +- exporter/kinesisexporter/go.sum | 25 ---- exporter/kinesisexporter/marshaller.go | 37 +++++ exporter/kinesisexporter/marshaller_test.go | 38 +++++ exporter/kinesisexporter/otlp_marshaller.go | 35 +++++ .../kinesisexporter/otlp_marshaller_test.go | 63 +++++++++ exporter/kinesisexporter/producer.go | 97 +++++++++++++ exporter/kinesisexporter/testdata/config.yaml | 11 +- .../kinesisexporter/testdata/default.yaml | 4 + 16 files changed, 644 insertions(+), 118 deletions(-) create mode 100644 exporter/kinesisexporter/exporter_test.go create mode 100644 exporter/kinesisexporter/factory_test.go create mode 100644 exporter/kinesisexporter/marshaller.go create mode 100644 exporter/kinesisexporter/marshaller_test.go create mode 100644 exporter/kinesisexporter/otlp_marshaller.go create mode 100644 exporter/kinesisexporter/otlp_marshaller_test.go create mode 100644 exporter/kinesisexporter/producer.go diff --git a/exporter/kinesisexporter/README.md b/exporter/kinesisexporter/README.md index 73eea03a06b6..8bd4e03b5263 100644 --- a/exporter/kinesisexporter/README.md +++ b/exporter/kinesisexporter/README.md @@ -1,3 +1,48 @@ # Kinesis Exporter -To be added. \ No newline at end of file +Kinesis exporter exports OpenTelemetry data to Kinesis. This exporter uses a [KPL][kpl-url]-like batch producer and uses +the same aggregation format that KPLs use. Message payload encoding is configurable. + +The following settings can be optionally configured: +- `aws` contains AWS specific configuration + - `stream_name` (default = test-stream): The name of the Kinesis stream where events are sent/pushed + - `kinesis_endpoint`: The Kinesis endpoint if role is not being assumed + - `region` (default = us-west-2): The AWS region where the Kinesis stream is defined + - `role`: The Kinesis role to assume +- `kpl` contains kinesis producer library related config to controls things like aggregation, batching, connections, retries, etc + - `aggregate_batch_count` (default = 4294967295): Determines the maximum number of items to pack into an aggregated record. Must not exceed 4294967295 + - `aggregate_batch_size` (default = 51200): Determines the maximum number of bytes to pack into an aggregated record. User records larger than this will bypass aggregation + - `batch_size` (default = 5242880): Determines the maximum number of bytes to send with a PutRecords request. Must not exceed 5MiB + - `batch_count` (default = 1000): Determines the maximum number of items to pack in the batch. Must not exceed 1000 + - `backlog_count` (default = 2000): Determines the channel capacity before Put() will begin blocking. Default to `BatchCount` + - `flush_interval_seconds` (default = 5): The regular interval for flushing the kinesis producer buffer + - `max_connections` (default = 24): Number of requests to send concurrently + - `max_retries` (default = 10): Number of retry attempts to make before dropping records + - `max_backoff_seconds` (default = 60): Maximum time to backoff. Must be greater than 1s +- `encoding` (default = otlp_proto): The encoding of the payload sent to Kinesis. Available encodings: + - `otlp_proto`: the payload is serialized to otlp proto bytes + +Example configuration: + +```yaml +exporters: + kinesis: + encoding: "otlp_proto" + aws: + stream_name: test-stream + region: mars-1 + role: arn:test-role + kinesis_endpoint: kinesis.mars-1.aws.galactic + kpl: + aggregate_batch_count: 4294967295 + aggregate_batch_size: 51200 + batch_size: 5242880 + batch_count: 1000 + backlog_count: 2000 + flush_interval_seconds: 5 + max_connections: 24 + max_retries: 10 + max_backoff_seconds: 60 +``` + +[kpl-url]: https://github.com/awslabs/amazon-kinesis-producer \ No newline at end of file diff --git a/exporter/kinesisexporter/config.go b/exporter/kinesisexporter/config.go index e03930c4c391..9535d026222d 100644 --- a/exporter/kinesisexporter/config.go +++ b/exporter/kinesisexporter/config.go @@ -47,9 +47,5 @@ type Config struct { AWS AWSConfig `mapstructure:"aws"` KPL KPLConfig `mapstructure:"kpl"` - QueueSize int `mapstructure:"queue_size"` - NumWorkers int `mapstructure:"num_workers"` - MaxBytesPerBatch int `mapstructure:"max_bytes_per_batch"` - MaxBytesPerSpan int `mapstructure:"max_bytes_per_span"` - FlushIntervalSeconds int `mapstructure:"flush_interval_seconds"` + Encoding string `mapstructure:"encoding"` } diff --git a/exporter/kinesisexporter/config_test.go b/exporter/kinesisexporter/config_test.go index 5e7ca220b2f5..fdd09b58d71a 100644 --- a/exporter/kinesisexporter/config_test.go +++ b/exporter/kinesisexporter/config_test.go @@ -27,6 +27,7 @@ import ( ) func TestDefaultConfig(t *testing.T) { + t.Parallel() factories, err := componenttest.ExampleComponents() assert.Nil(t, err) @@ -47,7 +48,8 @@ func TestDefaultConfig(t *testing.T) { NameVal: "kinesis", }, AWS: AWSConfig{ - Region: "us-west-2", + Region: "us-west-2", + StreamName: "test-stream", }, KPL: KPLConfig{ BatchSize: 5242880, @@ -56,20 +58,15 @@ func TestDefaultConfig(t *testing.T) { FlushIntervalSeconds: 5, MaxConnections: 24, }, - - QueueSize: 100000, - NumWorkers: 8, - FlushIntervalSeconds: 5, - MaxBytesPerBatch: 100000, - MaxBytesPerSpan: 900000, + Encoding: defaultEncoding, }, ) } func TestConfig(t *testing.T) { + t.Parallel() factories, err := componenttest.ExampleComponents() assert.Nil(t, err) - factory := NewFactory() factories.Exporters[factory.Type()] = factory cfg, err := configtest.LoadConfigFile( @@ -104,17 +101,13 @@ func TestConfig(t *testing.T) { MaxRetries: 17, MaxBackoffSeconds: 18, }, - - QueueSize: 1, - NumWorkers: 2, - FlushIntervalSeconds: 3, - MaxBytesPerBatch: 4, - MaxBytesPerSpan: 5, + Encoding: "", }, ) } func TestConfigCheck(t *testing.T) { + t.Parallel() cfg := (NewFactory()).CreateDefaultConfig() assert.NoError(t, configcheck.ValidateConfig(cfg)) } diff --git a/exporter/kinesisexporter/exporter.go b/exporter/kinesisexporter/exporter.go index e53181ac26fb..b53a1f81fa00 100644 --- a/exporter/kinesisexporter/exporter.go +++ b/exporter/kinesisexporter/exporter.go @@ -16,57 +16,100 @@ package kinesisexporter import ( "context" + "fmt" - kinesis "github.com/signalfx/opencensus-go-exporter-kinesis" + "github.com/google/uuid" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" - jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" "go.uber.org/zap" ) -// Exporter implements an OpenTelemetry trace exporter that exports all spans to AWS Kinesis -type Exporter struct { - kinesis *kinesis.Exporter - logger *zap.Logger +const ( + errInvalidContext = "invalid context" +) + +// exporter implements an OpenTelemetry exporter that pushes OpenTelemetry data to AWS Kinesis +type exporter struct { + producer producer + logger *zap.Logger + marshaller Marshaller } -var _ component.TraceExporter = (*Exporter)(nil) +// newExporter creates a new exporter with the passed in configurations. +// It starts the AWS session and setups the relevant connections. +func newExporter(c *Config, logger *zap.Logger) (*exporter, error) { + // Get marshaller based on config + marshaller := defaultMarshallers()[c.Encoding] + if marshaller == nil { + return nil, fmt.Errorf("unrecognized encoding") + } -// Start tells the exporter to start. The exporter may prepare for exporting + pr, err := newKinesisProducer(c, logger) + if err != nil { + return nil, err + } + + return &exporter{producer: pr, marshaller: marshaller, logger: logger}, nil +} + +// start tells the exporter to start. The exporter may prepare for exporting // by connecting to the endpoint. Host parameter can be used for communicating -// with the host after Start() has already returned. If error is returned by -// Start() then the collector startup will be aborted. -func (e Exporter) Start(_ context.Context, _ component.Host) error { +// with the host after start() has already returned. If error is returned by +// start() then the collector startup will be aborted. +func (e *exporter) start(ctx context.Context, _ component.Host) error { + if ctx == nil || ctx.Err() != nil { + return fmt.Errorf(errInvalidContext) + } + + e.producer.start() return nil } -// Shutdown is invoked during exporter shutdown. -func (e Exporter) Shutdown(context.Context) error { - e.kinesis.Flush() +// shutdown is invoked during exporter shutdown +func (e *exporter) shutdown(ctx context.Context) error { + if ctx == nil || ctx.Err() != nil { + return fmt.Errorf(errInvalidContext) + } + + e.producer.stop() return nil } -// ConsumeTraceData receives a span batch and exports it to AWS Kinesis -func (e Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) error { - pBatches, err := jaegertranslator.InternalTracesToJaegerProto(td) +func (e *exporter) pushTraces(ctx context.Context, td pdata.Traces) (int, error) { + if ctx == nil || ctx.Err() != nil { + return 0, fmt.Errorf(errInvalidContext) + } + + pBatches, err := e.marshaller.MarshalTraces(td) if err != nil { e.logger.Error("error translating span batch", zap.Error(err)) - return consumererror.Permanent(err) + return td.SpanCount(), consumererror.Permanent(err) + } + + if err = e.producer.put(pBatches, uuid.New().String()); err != nil { + e.logger.Error("error exporting span to kinesis", zap.Error(err)) + return td.SpanCount(), err + } + + return 0, nil +} + +func (e *exporter) pushMetrics(ctx context.Context, td pdata.Metrics) (int, error) { + if ctx == nil || ctx.Err() != nil { + return 0, fmt.Errorf(errInvalidContext) + } + + pBatches, err := e.marshaller.MarshalMetrics(td) + if err != nil { + e.logger.Error("error translating metrics batch", zap.Error(err)) + return td.MetricCount(), consumererror.Permanent(err) } - // TODO: Use a multi error type - var exportErr error - for _, pBatch := range pBatches { - for _, span := range pBatch.GetSpans() { - if span.Process == nil { - span.Process = pBatch.Process - } - err := e.kinesis.ExportSpan(span) - if err != nil { - e.logger.Error("error exporting span to kinesis", zap.Error(err)) - exportErr = err - } - } + + if err = e.producer.put(pBatches, uuid.New().String()); err != nil { + e.logger.Error("error exporting metrics to kinesis", zap.Error(err)) + return td.MetricCount(), err } - return exportErr + + return 0, nil } diff --git a/exporter/kinesisexporter/exporter_test.go b/exporter/kinesisexporter/exporter_test.go new file mode 100644 index 000000000000..cae899815309 --- /dev/null +++ b/exporter/kinesisexporter/exporter_test.go @@ -0,0 +1,131 @@ +// Copyright 2019 OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap/zaptest" +) + +type producerMock struct { + mock.Mock +} + +func (m *producerMock) start() { + m.Called() +} + +func (m *producerMock) stop() { + m.Called() +} + +func (m *producerMock) put(data []byte, partitionKey string) error { + args := m.Called(data, partitionKey) + return args.Error(0) +} + +func TestNewKinesisExporter(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, err := newExporter(cfg, zaptest.NewLogger(t)) + assert.NotNil(t, exp) + assert.NoError(t, err) +} + +func TestNewKinesisExporterBadEncoding(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + cfg.Encoding = "" + + exp, err := newExporter(cfg, zaptest.NewLogger(t)) + assert.Nil(t, exp) + assert.Errorf(t, err, "unrecognized encoding") +} + +func TestPushingTracesToKinesisQueue(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zaptest.NewLogger(t)) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil) + + dropped, err := exp.pushTraces(context.Background(), pdata.NewTraces()) + require.NoError(t, err) + require.Equal(t, 0, dropped) +} + +func TestErrorPushingTracesToKinesisQueue(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zaptest.NewLogger(t)) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror")) + + _, err := exp.pushTraces(context.Background(), pdata.NewTraces()) + require.Error(t, err) +} + +func TestPushingMetricsToKinesisQueue(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zaptest.NewLogger(t)) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil) + + dropped, err := exp.pushMetrics(context.Background(), pdata.NewMetrics()) + require.NoError(t, err) + require.Equal(t, 0, dropped) +} + +func TestErrorPushingMetricsToKinesisQueue(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zaptest.NewLogger(t)) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror")) + + _, err := exp.pushMetrics(context.Background(), pdata.NewMetrics()) + require.Error(t, err) +} diff --git a/exporter/kinesisexporter/factory.go b/exporter/kinesisexporter/factory.go index 644516a02388..98804bf89b0f 100644 --- a/exporter/kinesisexporter/factory.go +++ b/exporter/kinesisexporter/factory.go @@ -17,7 +17,6 @@ package kinesisexporter import ( "context" - kinesis "github.com/signalfx/opencensus-go-exporter-kinesis" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -25,8 +24,10 @@ import ( const ( // The value of "type" key in configuration. - typeStr = "kinesis" - exportFormat = "jaeger-proto" + typeStr = "kinesis" + otlpProto = "otlp_proto" + // The default encoding scheme is set to otlpProto + defaultEncoding = otlpProto ) // NewFactory creates a factory for Kinesis exporter. @@ -34,7 +35,8 @@ func NewFactory() component.ExporterFactory { return exporterhelper.NewFactory( typeStr, createDefaultConfig, - exporterhelper.WithTraces(createTraceExporter)) + exporterhelper.WithTraces(createTraceExporter), + exporterhelper.WithMetrics(createMetricsExporter)) } func createDefaultConfig() configmodels.Exporter { @@ -44,7 +46,8 @@ func createDefaultConfig() configmodels.Exporter { NameVal: typeStr, }, AWS: AWSConfig{ - Region: "us-west-2", + Region: "us-west-2", + StreamName: "test-stream", }, KPL: KPLConfig{ BatchSize: 5242880, @@ -53,12 +56,7 @@ func createDefaultConfig() configmodels.Exporter { FlushIntervalSeconds: 5, MaxConnections: 24, }, - - QueueSize: 100000, - NumWorkers: 8, - FlushIntervalSeconds: 5, - MaxBytesPerBatch: 100000, - MaxBytesPerSpan: 900000, + Encoding: defaultEncoding, } } @@ -68,32 +66,32 @@ func createTraceExporter( config configmodels.Exporter, ) (component.TraceExporter, error) { c := config.(*Config) - k, err := kinesis.NewExporter(&kinesis.Options{ - Name: c.Name(), - StreamName: c.AWS.StreamName, - AWSRegion: c.AWS.Region, - AWSRole: c.AWS.Role, - AWSKinesisEndpoint: c.AWS.KinesisEndpoint, + exp, err := newExporter(c, params.Logger) + if err != nil { + return nil, err + } - KPLAggregateBatchSize: c.KPL.AggregateBatchSize, - KPLAggregateBatchCount: c.KPL.AggregateBatchCount, - KPLBatchSize: c.KPL.BatchSize, - KPLBatchCount: c.KPL.BatchCount, - KPLBacklogCount: c.KPL.BacklogCount, - KPLFlushIntervalSeconds: c.KPL.FlushIntervalSeconds, - KPLMaxConnections: c.KPL.MaxConnections, - KPLMaxRetries: c.KPL.MaxRetries, - KPLMaxBackoffSeconds: c.KPL.MaxBackoffSeconds, + return exporterhelper.NewTraceExporter( + c, + exp.pushTraces, + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown)) +} - QueueSize: c.QueueSize, - NumWorkers: c.NumWorkers, - MaxAllowedSizePerSpan: c.MaxBytesPerSpan, - MaxListSize: c.MaxBytesPerBatch, - ListFlushInterval: c.FlushIntervalSeconds, - Encoding: exportFormat, - }, params.Logger) +func createMetricsExporter( + _ context.Context, + params component.ExporterCreateParams, + config configmodels.Exporter, +) (component.MetricsExporter, error) { + c := config.(*Config) + exp, err := newExporter(c, params.Logger) if err != nil { return nil, err } - return Exporter{k, params.Logger}, nil + + return exporterhelper.NewMetricsExporter( + c, + exp.pushMetrics, + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown)) } diff --git a/exporter/kinesisexporter/factory_test.go b/exporter/kinesisexporter/factory_test.go new file mode 100644 index 000000000000..32343b7346c1 --- /dev/null +++ b/exporter/kinesisexporter/factory_test.go @@ -0,0 +1,67 @@ +// Copyright 2019 OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configcheck" +) + +func TestCreateDefaultConfig(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) + assert.Equal(t, cfg.Encoding, defaultEncoding) +} + +func TestCreateTracesExporter(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} + +func TestErrorCreateTracesExporterByInvalidEncoding(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + cfg.Encoding = "" + r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.Error(t, err) + assert.Nil(t, r) +} + +func TestCreateMetricsExporter(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} + +func TestErrorCreateMetricsExporterByInvalidEncoding(t *testing.T) { + t.Parallel() + cfg := createDefaultConfig().(*Config) + cfg.Encoding = "" + r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.Error(t, err) + assert.Nil(t, r) +} diff --git a/exporter/kinesisexporter/go.mod b/exporter/kinesisexporter/go.mod index 59918b19883f..f6e5d1ddd544 100644 --- a/exporter/kinesisexporter/go.mod +++ b/exporter/kinesisexporter/go.mod @@ -3,7 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kinesi go 1.14 require ( - github.com/signalfx/opencensus-go-exporter-kinesis v0.6.3 + github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect + github.com/aws/aws-sdk-go v1.34.9 + github.com/go-ole/go-ole v1.2.4 // indirect + github.com/gogo/googleapis v1.3.1 // indirect + github.com/google/uuid v1.1.2 + github.com/signalfx/omnition-kinesis-producer v0.5.0 github.com/stretchr/testify v1.6.1 go.opentelemetry.io/collector v0.12.1-0.20201012183541-526f34200197 go.uber.org/zap v1.16.0 diff --git a/exporter/kinesisexporter/go.sum b/exporter/kinesisexporter/go.sum index cc87bb39ab68..ab6ab387a2e9 100644 --- a/exporter/kinesisexporter/go.sum +++ b/exporter/kinesisexporter/go.sum @@ -145,8 +145,6 @@ github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+Wji github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bombsimon/wsl/v3 v3.1.0 h1:E5SRssoBgtVFPcYWUOFJEcgaySgdtTNYzsSKDOY7ss8= github.com/bombsimon/wsl/v3 v3.1.0/go.mod h1:st10JtZYLE4D5sC7b8xV4zTKZwAQjCH/Hy2Pm1FNZIc= -github.com/brianvoe/gofakeit v3.17.0+incompatible h1:C1+30+c0GtjgGDtRC+iePZeP1WMiwsWCELNJhmc7aIc= -github.com/brianvoe/gofakeit v3.17.0+incompatible/go.mod h1:kfwdRA90vvNhPutZWfH7WPaDzUjz+CZFqG+rPkOjGOc= github.com/bsm/sarama-cluster v2.1.13+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= @@ -225,7 +223,6 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= -github.com/dropbox/godropbox v0.0.0-20180512210157-31879d3884b9/go.mod h1:glr97hP/JuXb+WMYCizc4PIFuzw1lCR97mwbe1VVXhQ= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= @@ -247,8 +244,6 @@ github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b/go.mod h1:NAJj0 github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= -github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= -github.com/facebookgo/stackerr v0.0.0-20150612192056-c2fcf88613f4/go.mod h1:SBHk9aNQtiw4R4bEuzHjVmZikkUKCnO1v3lPQ21HZGk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= @@ -479,7 +474,6 @@ github.com/golangci/revgrep v0.0.0-20180526074752-d9c87f5ffaf0 h1:HVfrLniijszjS1 github.com/golangci/revgrep v0.0.0-20180526074752-d9c87f5ffaf0/go.mod h1:qOQCunEYvmd/TLamH+7LlVccLvUH5kZNhbCgTHoBbp4= github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 h1:zwtduBRr5SSWhqsYNgcuWO2kFlpdOZbP0+yRjmvPGys= github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4/go.mod h1:Izgrg8RkN3rCIMLGE9CyYmU9pY2Jer6DgANEnZ/L/cQ= -github.com/google/addlicense v0.0.0-20190510175307-22550fa7c1b0/go.mod h1:QtPG26W17m+OIQgE6gQ24gC1M6pUaMBAbFrTIDtwG/E= github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5 h1:m6Z1Cm53o4VecQFxKCnvULGfIT0Igo3MX131i+00IIo= github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5/go.mod h1:EMjYTRimagHs1FwlIqKyX3wAM0u3rA+McvlIIWmSamA= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -632,7 +626,6 @@ github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19y github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= -github.com/jaegertracing/jaeger v1.15.1/go.mod h1:LUWPSnzNPGRubM8pk0inANGitpiMOOxihXx0+53llXI= github.com/jaegertracing/jaeger v1.20.0 h1:rnwhl7COrEj1/vYfumL84CoiwOEy2MLFJFcW1bqjxnA= github.com/jaegertracing/jaeger v1.20.0/go.mod h1:EFO94eQMRMI5KM4RIWcnl3rocmGEVt232TIG4Ua/4T0= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= @@ -666,9 +659,6 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/jsternberg/zap-logfmt v1.0.0/go.mod h1:uvPs/4X51zdkcm5jXl5SYoN+4RK21K8mysFmDaM/h+o= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= -github.com/juju/errors v0.0.0-20181012004132-a4583d0a56ea/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= -github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= -github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -865,7 +855,6 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= -github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b/go.mod h1:x/hU0bfdWIhuOT1SKwiJg++yvkk6EuOtJk8WtDZqgr8= github.com/pavius/impi v0.0.3 h1:DND6MzU+BLABhOZXbELR3FU8b+zDgcq4dOCNLhiTYuI= github.com/pavius/impi v0.0.3/go.mod h1:x/hU0bfdWIhuOT1SKwiJg++yvkk6EuOtJk8WtDZqgr8= github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= @@ -974,7 +963,6 @@ github.com/ryanrolds/sqlclosecheck v0.3.0 h1:AZx+Bixh8zdUBxUA1NxbxVAS78vTPq4rCb8 github.com/ryanrolds/sqlclosecheck v0.3.0/go.mod h1:1gREqxyTGR3lVtpngyFo3hZAgk0KCtEdgEkHwDbigdA= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -github.com/samuel/go-zookeeper v0.0.0-20190810000440-0ceca61e4d75/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e h1:CGjiMQ0wMH4wtNWrlj6kiTbkPt2F3rbYnhGX6TWLfco= github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= @@ -994,7 +982,6 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c h1:W65qqJCIOVP4jpqPQ0YvHYKwcMEMVWIzWC5iNQQfBTU= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs= github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc= -github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v2.20.6+incompatible h1:P37G9YH8M4vqkKcwBosp+URN5O8Tay67D2MbR361ioY= github.com/shirou/gopsutil v2.20.6+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= @@ -1004,16 +991,8 @@ github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/shurcooL/vfsgen v0.0.0-20200627165143-92b8a710ab6c/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= -github.com/signalfx/com_signalfx_metrics_protobuf v0.0.0-20190222193949-1fb69526e884/go.mod h1:muYA2clvwCdj7nzAJ5vJIXYpJsUumhAl4Uu1wUNpWzA= -github.com/signalfx/gohistogram v0.0.0-20160107210732-1ccfd2ff5083/go.mod h1:adPDS6s7WaajdFBV9mQ7i0dKfQ8xiDnF9ZNETVPpp7c= -github.com/signalfx/golib/v3 v3.3.0 h1:vSXsAb73bdrlnjk5rnZ7y3t09Qzu9qfBEbXdcyBHsmE= -github.com/signalfx/golib/v3 v3.3.0/go.mod h1:GzjWpV0skAXZn7+u9LnkOkiXAx9KKd5XZcd5r+RoF5o= -github.com/signalfx/gomemcache v0.0.0-20180823214636-4f7ef64c72a9/go.mod h1:Ytb8KfCSyuwy/VILnROdgCvbQLA5ch0nkbG7lKT0BXw= github.com/signalfx/omnition-kinesis-producer v0.5.0 h1:pENQrLmI3XBggkBf/UNYXcpPP/XhNMBdBVfeBUOFZoQ= github.com/signalfx/omnition-kinesis-producer v0.5.0/go.mod h1:5tt4Zb0FS0QRKXVGFUmpX0aEE4bn2bB972znpqMqJtg= -github.com/signalfx/opencensus-go-exporter-kinesis v0.6.3 h1:ooYCDeKtuwmT+HNBkv/VjkPp97f4xAmA6COgHQS9+as= -github.com/signalfx/opencensus-go-exporter-kinesis v0.6.3/go.mod h1:iKTZPIUUpRI9Hp2yAMb2qNXl6itkEd2pxAznG08Y6YU= -github.com/signalfx/sapm-proto v0.4.0/go.mod h1:x3gtwJ1GRejtkghB4nYpwixh2zqJrLbPU959ZNhM0Fk= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -1023,7 +1002,6 @@ github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v1.6.4-0.20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= @@ -1116,7 +1094,6 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.15.1/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA= github.com/valyala/quicktemplate v1.6.2/go.mod h1:mtEJpQtUiBV0SHhMX6RtiJtqxncgrfmjcUy5T68X8TM= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec/go.mod h1:owBmyHYMLkxyrugmfwE/DLJyW8Ro9mkphwuVErQ0iUw= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= @@ -1393,7 +1370,6 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20190906203814-12febf440ab1/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190910044552-dd2b5c81c578/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -1559,7 +1535,6 @@ gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlI gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= -gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= diff --git a/exporter/kinesisexporter/marshaller.go b/exporter/kinesisexporter/marshaller.go new file mode 100644 index 000000000000..e6e75a85021b --- /dev/null +++ b/exporter/kinesisexporter/marshaller.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "go.opentelemetry.io/collector/consumer/pdata" +) + +// Marshaller marshals Opentelemetry data into byte array +type Marshaller interface { + // MarshalTraces serializes traces into a byte array + MarshalTraces(traces pdata.Traces) ([]byte, error) + // MarshalMetrics serializes metrics into a byte array + MarshalMetrics(metrics pdata.Metrics) ([]byte, error) + + // Encoding returns encoding name + Encoding() string +} + +// defaultMarshallers returns map of supported encodings with Marshaller. +func defaultMarshallers() map[string]Marshaller { + return map[string]Marshaller{ + otlpProto: &otlpProtoMarshaller{}, + } +} diff --git a/exporter/kinesisexporter/marshaller_test.go b/exporter/kinesisexporter/marshaller_test.go new file mode 100644 index 000000000000..46607ca54bef --- /dev/null +++ b/exporter/kinesisexporter/marshaller_test.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefaultMarshallers(t *testing.T) { + t.Parallel() + expectedEncodings := []string{ + "otlp_proto", + } + marshallers := defaultMarshallers() + assert.Equal(t, len(expectedEncodings), len(marshallers)) + for _, e := range expectedEncodings { + t.Run(e, func(t *testing.T) { + m, ok := marshallers[e] + require.True(t, ok) + assert.NotNil(t, m) + }) + } +} diff --git a/exporter/kinesisexporter/otlp_marshaller.go b/exporter/kinesisexporter/otlp_marshaller.go new file mode 100644 index 000000000000..bf9d58035edd --- /dev/null +++ b/exporter/kinesisexporter/otlp_marshaller.go @@ -0,0 +1,35 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "go.opentelemetry.io/collector/consumer/pdata" +) + +type otlpProtoMarshaller struct{} + +var _ Marshaller = (*otlpProtoMarshaller)(nil) + +func (*otlpProtoMarshaller) MarshalTraces(traces pdata.Traces) ([]byte, error) { + return traces.ToOtlpProtoBytes() +} + +func (*otlpProtoMarshaller) MarshalMetrics(metrics pdata.Metrics) ([]byte, error) { + return metrics.ToOtlpProtoBytes() +} + +func (*otlpProtoMarshaller) Encoding() string { + return otlpProto +} diff --git a/exporter/kinesisexporter/otlp_marshaller_test.go b/exporter/kinesisexporter/otlp_marshaller_test.go new file mode 100644 index 000000000000..daf3c7f8924d --- /dev/null +++ b/exporter/kinesisexporter/otlp_marshaller_test.go @@ -0,0 +1,63 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" +) + +func TestCorrectEncoding(t *testing.T) { + t.Parallel() + m := otlpProtoMarshaller{} + assert.Equal(t, otlpProto, m.Encoding()) +} + +func TestOTLPMetricsMarshaller(t *testing.T) { + t.Parallel() + td := pdata.NewMetrics() + td.ResourceMetrics().Resize(1) + td.ResourceMetrics().At(0).Resource().InitEmpty() + td.ResourceMetrics().At(0).Resource().Attributes().InsertString("foo", "bar") + expected, err := td.ToOtlpProtoBytes() + require.NoError(t, err) + require.NotNil(t, expected) + + m := otlpProtoMarshaller{} + payload, err := m.MarshalMetrics(td) + require.NoError(t, err) + assert.Equal(t, expected, payload) +} + +func TestOTLPTracesMarshaller(t *testing.T) { + t.Parallel() + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + span := td.ResourceSpans().At(0).Resource() + span.InitEmpty() + span.Attributes().InsertString("foo", "bar") + + expected, err := td.ToOtlpProtoBytes() + require.NoError(t, err) + require.NotNil(t, expected) + + m := otlpProtoMarshaller{} + payload, err := m.MarshalTraces(td) + require.NoError(t, err) + assert.Equal(t, expected, payload) +} diff --git a/exporter/kinesisexporter/producer.go b/exporter/kinesisexporter/producer.go new file mode 100644 index 000000000000..06a9ad35c768 --- /dev/null +++ b/exporter/kinesisexporter/producer.go @@ -0,0 +1,97 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + omnition "github.com/signalfx/omnition-kinesis-producer" + "github.com/signalfx/omnition-kinesis-producer/loggers/kpzap" + "go.uber.org/zap" +) + +// producer provides the interface for a kinesis producer. The producer +// implementation abstracts the interaction with the kinesis producer library +// used from the exporter +type producer interface { + start() + stop() + put(data []byte, partitionKey string) error +} + +type client struct { + client *omnition.Producer + logger *zap.Logger +} + +func newKinesisProducer(c *Config, logger *zap.Logger) (producer, error) { + sess, err := session.NewSession() + if err != nil { + return nil, err + } + + awsConfig := aws.NewConfig().WithRegion(c.AWS.Region).WithEndpoint(c.AWS.KinesisEndpoint) + // If AWS role is provided, use sts credentials to assume the role + if len(c.AWS.Role) > 0 { + creds := stscreds.NewCredentials(sess, c.AWS.Role) + awsConfig = awsConfig.WithCredentials(creds) + } + + o := omnition.New(&omnition.Config{ + Logger: &kpzap.Logger{Logger: logger}, + Client: kinesis.New(sess, awsConfig), + StreamName: c.AWS.StreamName, + // KPL parameters + FlushInterval: time.Duration(c.KPL.FlushIntervalSeconds) * time.Second, + BatchCount: c.KPL.BatchCount, + BatchSize: c.KPL.BatchSize, + AggregateBatchCount: c.KPL.AggregateBatchCount, + AggregateBatchSize: c.KPL.AggregateBatchSize, + BacklogCount: c.KPL.BacklogCount, + MaxConnections: c.KPL.MaxConnections, + MaxRetries: c.KPL.MaxRetries, + MaxBackoffTime: time.Duration(c.KPL.MaxBackoffSeconds) * time.Second, + }, nil) + + return client{client: o, logger: logger}, nil +} + +func (c client) start() { + c.client.Start() + go c.notifyErrors() +} + +// notifyErrors logs the failures within the kinesis exporter +func (c client) notifyErrors() { + for r := range c.client.NotifyFailures() { + // Logging error for now, these are normally unrecoverable failures + c.logger.Error("error putting record on kinesis", + zap.String("partitionKey", r.PartitionKey), + zap.Error(r.Err), + ) + } +} + +func (c client) stop() { + c.client.Stop() +} + +func (c client) put(data []byte, partitionKey string) error { + return c.client.Put(data, partitionKey) +} diff --git a/exporter/kinesisexporter/testdata/config.yaml b/exporter/kinesisexporter/testdata/config.yaml index 9d35588bded7..ebd6b177f375 100644 --- a/exporter/kinesisexporter/testdata/config.yaml +++ b/exporter/kinesisexporter/testdata/config.yaml @@ -3,12 +3,7 @@ receivers: exporters: kinesis: - queue_size: 1 - num_workers: 2 - flush_interval_seconds: 3 - max_bytes_per_batch: 4 - max_bytes_per_span: 5 - + encoding: "" aws: stream_name: test-stream region: mars-1 @@ -35,3 +30,7 @@ service: receivers: [examplereceiver] processors: [exampleprocessor] exporters: [kinesis] + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [kinesis] diff --git a/exporter/kinesisexporter/testdata/default.yaml b/exporter/kinesisexporter/testdata/default.yaml index e4dd7cc5f14e..0ddbd2913ad9 100644 --- a/exporter/kinesisexporter/testdata/default.yaml +++ b/exporter/kinesisexporter/testdata/default.yaml @@ -13,3 +13,7 @@ service: receivers: [examplereceiver] processors: [exampleprocessor] exporters: [kinesis] + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [kinesis]