From 1e46f2b09e784e456f2ffb767b1d794e9ee918dc Mon Sep 17 00:00:00 2001 From: Iris Grace Endozo Date: Thu, 15 Oct 2020 12:31:02 +1100 Subject: [PATCH] Increase test coverage + some cleanups (#8) * Add tests + private methods * Add comments + fixes Co-authored-by: Iris Grace Endozo --- exporter/kinesisexporter/exporter.go | 78 ++++++------------ exporter/kinesisexporter/exporter_test.go | 87 ++++++++++++++++++++- exporter/kinesisexporter/factory.go | 16 ++-- exporter/kinesisexporter/factory_test.go | 62 +++++++++++++++ exporter/kinesisexporter/marshaller.go | 3 +- exporter/kinesisexporter/otlp_marshaller.go | 8 +- exporter/kinesisexporter/producer.go | 81 +++++++++++++++++++ 7 files changed, 264 insertions(+), 71 deletions(-) create mode 100644 exporter/kinesisexporter/factory_test.go create mode 100644 exporter/kinesisexporter/producer.go diff --git a/exporter/kinesisexporter/exporter.go b/exporter/kinesisexporter/exporter.go index 512ca9277722..01b1407a9cae 100644 --- a/exporter/kinesisexporter/exporter.go +++ b/exporter/kinesisexporter/exporter.go @@ -17,111 +17,79 @@ package kinesisexporter import ( "context" "fmt" - "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" "github.com/google/uuid" - producer "github.com/signalfx/omnition-kinesis-producer" - kpzap "github.com/signalfx/omnition-kinesis-producer/loggers/kpzap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" ) -// Exporter implements an OpenTelemetry exporter that exports all traces/metrics to AWS Kinesis -type Exporter struct { - producer *producer.Producer +// exporter implements an OpenTelemetry exporter that pushes OpenTelemetry data to AWS Kinesis +type exporter struct { + producer producer logger *zap.Logger marshaller Marshaller } -// newKinesisExporter creates a new Exporter with the passed in configurations. +// newExporter creates a new exporter with the passed in configurations. // It starts the AWS session and setups the relevant connections. -func newKinesisExporter(c *Config, logger *zap.Logger) (*Exporter, error) { +func newExporter(c *Config, logger *zap.Logger) (*exporter, error) { // Get marshaller based on config marshaller := defaultMarshallers()[c.Encoding] if marshaller == nil { return nil, fmt.Errorf("unrecognized encoding") } - awsConfig := aws.NewConfig().WithRegion(c.AWS.Region).WithEndpoint(c.AWS.KinesisEndpoint) - sess, err := session.NewSession(awsConfig) + producer, err := newKinesisProducer(c, logger) if err != nil { return nil, err } - client := kinesis.New(sess) - pr := producer.New(&producer.Config{ - Logger: &kpzap.Logger{Logger: logger}, - Client: client, - StreamName: c.AWS.StreamName, - // KPL parameters - FlushInterval: time.Duration(c.KPL.FlushIntervalSeconds) * time.Second, - BatchCount: c.KPL.BatchCount, - BatchSize: c.KPL.BatchSize, - AggregateBatchCount: c.KPL.AggregateBatchCount, - AggregateBatchSize: c.KPL.AggregateBatchSize, - BacklogCount: c.KPL.BacklogCount, - MaxConnections: c.KPL.MaxConnections, - MaxRetries: c.KPL.MaxRetries, - MaxBackoffTime: time.Duration(c.KPL.MaxBackoffSeconds) * time.Second, - }, nil) - return &Exporter{producer: pr, marshaller: marshaller, logger: logger}, nil + return &exporter{producer: producer, marshaller: marshaller, logger: logger}, nil } -// Start tells the exporter to start. The exporter may prepare for exporting +// start tells the exporter to start. The exporter may prepare for exporting // by connecting to the endpoint. Host parameter can be used for communicating -// with the host after Start() has already returned. If error is returned by -// Start() then the collector startup will be aborted. -func (e Exporter) Start(_ context.Context, _ component.Host) error { - e.producer.Start() - go e.notifyErrors() +// with the host after start() has already returned. If error is returned by +// start() then the collector startup will be aborted. +func (e exporter) start(context.Context, component.Host) error { + e.producer.start() return nil } -// notifyErrors logs the failures within the kinesis exporter -func (e Exporter) notifyErrors() { - for r := range e.producer.NotifyFailures() { - // Logging error for now, these are normally unrecoverable failures - e.logger.Error("error putting record on kinesis", zap.Error(r.Err)) - } -} - -// Shutdown is invoked during exporter shutdown. -func (e Exporter) Shutdown(context.Context) error { - e.producer.Stop() +// shutdown is invoked during exporter shutdown +func (e exporter) shutdown(context.Context) error { + e.producer.stop() return nil } -// ConsumeTraces receives a span batch and exports it to AWS Kinesis -func (e Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) (int, error) { +func (e exporter) pushTraces(_ context.Context, td pdata.Traces) (int, error) { pBatches, err := e.marshaller.MarshalTraces(td) if err != nil { e.logger.Error("error translating span batch", zap.Error(err)) return td.SpanCount(), consumererror.Permanent(err) } - err = e.producer.Put(pBatches, uuid.New().String()) - if err != nil { + + if err = e.producer.put(pBatches, uuid.New().String()); err != nil { e.logger.Error("error exporting span to kinesis", zap.Error(err)) return td.SpanCount(), err } + return 0, nil } -// ConsumeMetrics receives a metrics batch and exports it to AWS Kinesis -func (e Exporter) ConsumeMetrics(_ context.Context, td pdata.Metrics) (int, error) { +func (e exporter) pushMetrics(_ context.Context, td pdata.Metrics) (int, error) { pBatches, err := e.marshaller.MarshalMetrics(td) if err != nil { e.logger.Error("error translating metrics batch", zap.Error(err)) return td.MetricCount(), consumererror.Permanent(err) } - err = e.producer.Put(pBatches, uuid.New().String()) - if err != nil { + + if err = e.producer.put(pBatches, uuid.New().String()); err != nil { e.logger.Error("error exporting metrics to kinesis", zap.Error(err)) return td.MetricCount(), err } + return 0, nil } diff --git a/exporter/kinesisexporter/exporter_test.go b/exporter/kinesisexporter/exporter_test.go index c6a51cbb67c9..56b9af3da28f 100644 --- a/exporter/kinesisexporter/exporter_test.go +++ b/exporter/kinesisexporter/exporter_test.go @@ -15,18 +15,39 @@ package kinesisexporter import ( + "context" + "fmt" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" ) +type producerMock struct { + mock.Mock +} + +func (m *producerMock) start() { + m.Called() +} + +func (m *producerMock) stop() { + m.Called() +} + +func (m *producerMock) put(data []byte, partitionKey string) error { + args := m.Called(data, partitionKey) + return args.Error(0) +} + func TestNewKinesisExporter(t *testing.T) { cfg := createDefaultConfig().(*Config) require.NotNil(t, cfg) - exp, err := newKinesisExporter(cfg, zap.NewNop()) + exp, err := newExporter(cfg, zap.NewNop()) assert.NotNil(t, exp) assert.NoError(t, err) } @@ -36,8 +57,70 @@ func TestNewKinesisExporterBadEncoding(t *testing.T) { require.NotNil(t, cfg) cfg.Encoding = "" - exp, err := newKinesisExporter(cfg, zap.NewNop()) + exp, err := newExporter(cfg, zap.NewNop()) assert.Nil(t, exp) assert.Error(t, err) assert.Equal(t, err.Error(), "unrecognized encoding") } + +func TestPushingTracesToKinesisQueue(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zap.NewNop()) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil) + + dropped, err := exp.pushTraces(context.Background(), pdata.NewTraces()) + require.NoError(t, err) + require.Equal(t, 0, dropped) +} + +func TestErrorPushingTracesToKinesisQueue(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zap.NewNop()) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror")) + + _, err := exp.pushTraces(context.Background(), pdata.NewTraces()) + require.Error(t, err) +} + +func TestPushingMetricsToKinesisQueue(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zap.NewNop()) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil) + + dropped, err := exp.pushMetrics(context.Background(), pdata.NewMetrics()) + require.NoError(t, err) + require.Equal(t, 0, dropped) +} + +func TestErrorPushingMetricsToKinesisQueue(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg) + + exp, _ := newExporter(cfg, zap.NewNop()) + mockProducer := new(producerMock) + exp.producer = mockProducer + require.NotNil(t, exp) + + mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror")) + + _, err := exp.pushMetrics(context.Background(), pdata.NewMetrics()) + require.Error(t, err) +} diff --git a/exporter/kinesisexporter/factory.go b/exporter/kinesisexporter/factory.go index d0623006b5bc..98804bf89b0f 100644 --- a/exporter/kinesisexporter/factory.go +++ b/exporter/kinesisexporter/factory.go @@ -66,16 +66,16 @@ func createTraceExporter( config configmodels.Exporter, ) (component.TraceExporter, error) { c := config.(*Config) - exp, err := newKinesisExporter(c, params.Logger) + exp, err := newExporter(c, params.Logger) if err != nil { return nil, err } return exporterhelper.NewTraceExporter( c, - exp.ConsumeTraces, - exporterhelper.WithStart(exp.Start), - exporterhelper.WithShutdown(exp.Shutdown)) + exp.pushTraces, + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown)) } func createMetricsExporter( @@ -84,14 +84,14 @@ func createMetricsExporter( config configmodels.Exporter, ) (component.MetricsExporter, error) { c := config.(*Config) - exp, err := newKinesisExporter(c, params.Logger) + exp, err := newExporter(c, params.Logger) if err != nil { return nil, err } return exporterhelper.NewMetricsExporter( c, - exp.ConsumeMetrics, - exporterhelper.WithStart(exp.Start), - exporterhelper.WithShutdown(exp.Shutdown)) + exp.pushMetrics, + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown)) } diff --git a/exporter/kinesisexporter/factory_test.go b/exporter/kinesisexporter/factory_test.go new file mode 100644 index 000000000000..5d8cf3778cb4 --- /dev/null +++ b/exporter/kinesisexporter/factory_test.go @@ -0,0 +1,62 @@ +// Copyright 2019 OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kinesisexporter + +import ( + "context" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/config/configcheck" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) + assert.Equal(t, cfg.Encoding, defaultEncoding) +} + +func TestCreateTracesExporter(t *testing.T) { + cfg := createDefaultConfig().(*Config) + r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} + +func TestErrorCreateTracesExporterByInvalidEncoding(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Encoding = "" + r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.Error(t, err) + assert.Nil(t, r) +} + +func TestCreateMetricsExporter(t *testing.T) { + cfg := createDefaultConfig().(*Config) + r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} + +func TestErrorCreateMetricsExporterByInvalidEncoding(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Encoding = "" + r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.Error(t, err) + assert.Nil(t, r) +} diff --git a/exporter/kinesisexporter/marshaller.go b/exporter/kinesisexporter/marshaller.go index 51b0a8a9ac0d..3ac6b006b259 100644 --- a/exporter/kinesisexporter/marshaller.go +++ b/exporter/kinesisexporter/marshaller.go @@ -18,11 +18,10 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -// Marshaller marshals traces/metrics into Message array. +// Marshaller marshals Opentelemetry data into byte array type Marshaller interface { // MarshalTraces serializes traces into a byte array MarshalTraces(traces pdata.Traces) ([]byte, error) - // MarshalMetrics serializes metrics into a byte array MarshalMetrics(metrics pdata.Metrics) ([]byte, error) diff --git a/exporter/kinesisexporter/otlp_marshaller.go b/exporter/kinesisexporter/otlp_marshaller.go index e55cb7ac7a60..f02a724d3b00 100644 --- a/exporter/kinesisexporter/otlp_marshaller.go +++ b/exporter/kinesisexporter/otlp_marshaller.go @@ -23,10 +23,6 @@ type otlpProtoMarshaller struct { var _ Marshaller = (*otlpProtoMarshaller)(nil) -func (m *otlpProtoMarshaller) Encoding() string { - return otlpProto -} - func (m *otlpProtoMarshaller) MarshalTraces(traces pdata.Traces) ([]byte, error) { return traces.ToOtlpProtoBytes() } @@ -34,3 +30,7 @@ func (m *otlpProtoMarshaller) MarshalTraces(traces pdata.Traces) ([]byte, error) func (m *otlpProtoMarshaller) MarshalMetrics(metrics pdata.Metrics) ([]byte, error) { return metrics.ToOtlpProtoBytes() } + +func (m *otlpProtoMarshaller) Encoding() string { + return otlpProto +} diff --git a/exporter/kinesisexporter/producer.go b/exporter/kinesisexporter/producer.go new file mode 100644 index 000000000000..4f7c15d221f0 --- /dev/null +++ b/exporter/kinesisexporter/producer.go @@ -0,0 +1,81 @@ +package kinesisexporter + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + omnition "github.com/signalfx/omnition-kinesis-producer" + "github.com/signalfx/omnition-kinesis-producer/loggers/kpzap" + "go.uber.org/zap" + "time" +) + +// producer provides the interface for a kinesis producer. The producer +// implementation abstracts the interaction with the kinesis producer library +// used from the exporter +type producer interface { + start() + stop() + put(data []byte, partitionKey string) error +} + +type client struct { + client *omnition.Producer + logger *zap.Logger +} + +func newKinesisProducer(c *Config, logger *zap.Logger) (producer, error) { + awsConfig := aws.NewConfig().WithRegion(c.AWS.Region).WithEndpoint(c.AWS.KinesisEndpoint) + sess, err := session.NewSession(awsConfig) + if err != nil { + return nil, err + } + + // If AWS role is provided, use sts credentials to assume the role + if len(c.AWS.Role) > 0 { + creds := stscreds.NewCredentials(sess, c.AWS.Role) + awsConfig = awsConfig.WithCredentials(creds) + } + + o := omnition.New(&omnition.Config{ + Logger: &kpzap.Logger{Logger: logger}, + Client: kinesis.New(sess, awsConfig), + StreamName: c.AWS.StreamName, + // KPL parameters + FlushInterval: time.Duration(c.KPL.FlushIntervalSeconds) * time.Second, + BatchCount: c.KPL.BatchCount, + BatchSize: c.KPL.BatchSize, + AggregateBatchCount: c.KPL.AggregateBatchCount, + AggregateBatchSize: c.KPL.AggregateBatchSize, + BacklogCount: c.KPL.BacklogCount, + MaxConnections: c.KPL.MaxConnections, + MaxRetries: c.KPL.MaxRetries, + MaxBackoffTime: time.Duration(c.KPL.MaxBackoffSeconds) * time.Second, + }, nil) + + return client{client: o, logger: logger}, nil +} + +func (c client) start() { + c.client.Start() + go c.notifyErrors() +} + +// notifyErrors logs the failures within the kinesis exporter +func (c client) notifyErrors() { + for r := range c.client.NotifyFailures() { + // Logging error for now, these are normally unrecoverable failures + c.logger.Error("error putting record on kinesis", + zap.String("partitionKey", r.PartitionKey), + zap.Error(r.Err)) + } +} + +func (c client) stop() { + c.client.Stop() +} + +func (c client) put(data []byte, partitionKey string) error { + return c.client.Put(data, partitionKey) +}