Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test coverage + some cleanups #8

Merged
merged 2 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 23 additions & 55 deletions exporter/kinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,111 +17,79 @@ package kinesisexporter
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/google/uuid"
producer "github.com/signalfx/omnition-kinesis-producer"
kpzap "github.com/signalfx/omnition-kinesis-producer/loggers/kpzap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

// Exporter implements an OpenTelemetry exporter that exports all traces/metrics to AWS Kinesis
type Exporter struct {
producer *producer.Producer
// exporter implements an OpenTelemetry exporter that pushes OpenTelemetry data to AWS Kinesis
type exporter struct {
producer producer
logger *zap.Logger
marshaller Marshaller
}

// newKinesisExporter creates a new Exporter with the passed in configurations.
// newExporter creates a new exporter with the passed in configurations.
// It starts the AWS session and setups the relevant connections.
func newKinesisExporter(c *Config, logger *zap.Logger) (*Exporter, error) {
func newExporter(c *Config, logger *zap.Logger) (*exporter, error) {
// Get marshaller based on config
marshaller := defaultMarshallers()[c.Encoding]
if marshaller == nil {
return nil, fmt.Errorf("unrecognized encoding")
}

awsConfig := aws.NewConfig().WithRegion(c.AWS.Region).WithEndpoint(c.AWS.KinesisEndpoint)
sess, err := session.NewSession(awsConfig)
producer, err := newKinesisProducer(c, logger)
if err != nil {
return nil, err
}
client := kinesis.New(sess)

pr := producer.New(&producer.Config{
Logger: &kpzap.Logger{Logger: logger},
Client: client,
StreamName: c.AWS.StreamName,
// KPL parameters
FlushInterval: time.Duration(c.KPL.FlushIntervalSeconds) * time.Second,
BatchCount: c.KPL.BatchCount,
BatchSize: c.KPL.BatchSize,
AggregateBatchCount: c.KPL.AggregateBatchCount,
AggregateBatchSize: c.KPL.AggregateBatchSize,
BacklogCount: c.KPL.BacklogCount,
MaxConnections: c.KPL.MaxConnections,
MaxRetries: c.KPL.MaxRetries,
MaxBackoffTime: time.Duration(c.KPL.MaxBackoffSeconds) * time.Second,
}, nil)
return &Exporter{producer: pr, marshaller: marshaller, logger: logger}, nil
return &exporter{producer: producer, marshaller: marshaller, logger: logger}, nil
}

// Start tells the exporter to start. The exporter may prepare for exporting
// start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
func (e Exporter) Start(_ context.Context, _ component.Host) error {
e.producer.Start()
go e.notifyErrors()
// with the host after start() has already returned. If error is returned by
// start() then the collector startup will be aborted.
func (e exporter) start(context.Context, component.Host) error {
e.producer.start()
return nil
}

// notifyErrors logs the failures within the kinesis exporter
func (e Exporter) notifyErrors() {
for r := range e.producer.NotifyFailures() {
// Logging error for now, these are normally unrecoverable failures
e.logger.Error("error putting record on kinesis", zap.Error(r.Err))
}
}

// Shutdown is invoked during exporter shutdown.
func (e Exporter) Shutdown(context.Context) error {
e.producer.Stop()
// shutdown is invoked during exporter shutdown
func (e exporter) shutdown(context.Context) error {
e.producer.stop()
return nil
}

// ConsumeTraces receives a span batch and exports it to AWS Kinesis
func (e Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) (int, error) {
func (e exporter) pushTraces(_ context.Context, td pdata.Traces) (int, error) {
pBatches, err := e.marshaller.MarshalTraces(td)
if err != nil {
e.logger.Error("error translating span batch", zap.Error(err))
return td.SpanCount(), consumererror.Permanent(err)
}
err = e.producer.Put(pBatches, uuid.New().String())
if err != nil {

if err = e.producer.put(pBatches, uuid.New().String()); err != nil {
e.logger.Error("error exporting span to kinesis", zap.Error(err))
return td.SpanCount(), err
}

return 0, nil
}

// ConsumeMetrics receives a metrics batch and exports it to AWS Kinesis
func (e Exporter) ConsumeMetrics(_ context.Context, td pdata.Metrics) (int, error) {
func (e exporter) pushMetrics(_ context.Context, td pdata.Metrics) (int, error) {
pBatches, err := e.marshaller.MarshalMetrics(td)
if err != nil {
e.logger.Error("error translating metrics batch", zap.Error(err))
return td.MetricCount(), consumererror.Permanent(err)
}
err = e.producer.Put(pBatches, uuid.New().String())
if err != nil {

if err = e.producer.put(pBatches, uuid.New().String()); err != nil {
e.logger.Error("error exporting metrics to kinesis", zap.Error(err))
return td.MetricCount(), err
}

return 0, nil
}
87 changes: 85 additions & 2 deletions exporter/kinesisexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,39 @@
package kinesisexporter

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

type producerMock struct {
mock.Mock
}

func (m *producerMock) start() {
m.Called()
}

func (m *producerMock) stop() {
m.Called()
}

func (m *producerMock) put(data []byte, partitionKey string) error {
args := m.Called(data, partitionKey)
return args.Error(0)
}

func TestNewKinesisExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, err := newKinesisExporter(cfg, zap.NewNop())
exp, err := newExporter(cfg, zap.NewNop())
assert.NotNil(t, exp)
assert.NoError(t, err)
}
Expand All @@ -36,8 +57,70 @@ func TestNewKinesisExporterBadEncoding(t *testing.T) {
require.NotNil(t, cfg)
cfg.Encoding = ""

exp, err := newKinesisExporter(cfg, zap.NewNop())
exp, err := newExporter(cfg, zap.NewNop())
assert.Nil(t, exp)
assert.Error(t, err)
assert.Equal(t, err.Error(), "unrecognized encoding")
}

func TestPushingTracesToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil)

dropped, err := exp.pushTraces(context.Background(), pdata.NewTraces())
require.NoError(t, err)
require.Equal(t, 0, dropped)
}

func TestErrorPushingTracesToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror"))

_, err := exp.pushTraces(context.Background(), pdata.NewTraces())
require.Error(t, err)
}

func TestPushingMetricsToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil)

dropped, err := exp.pushMetrics(context.Background(), pdata.NewMetrics())
require.NoError(t, err)
require.Equal(t, 0, dropped)
}

func TestErrorPushingMetricsToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror"))

_, err := exp.pushMetrics(context.Background(), pdata.NewMetrics())
require.Error(t, err)
}
16 changes: 8 additions & 8 deletions exporter/kinesisexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ func createTraceExporter(
config configmodels.Exporter,
) (component.TraceExporter, error) {
c := config.(*Config)
exp, err := newKinesisExporter(c, params.Logger)
exp, err := newExporter(c, params.Logger)
if err != nil {
return nil, err
}

return exporterhelper.NewTraceExporter(
c,
exp.ConsumeTraces,
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Shutdown))
exp.pushTraces,
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.shutdown))
}

func createMetricsExporter(
Expand All @@ -84,14 +84,14 @@ func createMetricsExporter(
config configmodels.Exporter,
) (component.MetricsExporter, error) {
c := config.(*Config)
exp, err := newKinesisExporter(c, params.Logger)
exp, err := newExporter(c, params.Logger)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
c,
exp.ConsumeMetrics,
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Shutdown))
exp.pushMetrics,
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.shutdown))
}
62 changes: 62 additions & 0 deletions exporter/kinesisexporter/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kinesisexporter

import (
"context"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/config/configcheck"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
assert.Equal(t, cfg.Encoding, defaultEncoding)
}

func TestCreateTracesExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestErrorCreateTracesExporterByInvalidEncoding(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Encoding = ""
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestErrorCreateMetricsExporterByInvalidEncoding(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Encoding = ""
r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.Error(t, err)
assert.Nil(t, r)
}
3 changes: 1 addition & 2 deletions exporter/kinesisexporter/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// Marshaller marshals traces/metrics into Message array.
// Marshaller marshals Opentelemetry data into byte array
type Marshaller interface {
// MarshalTraces serializes traces into a byte array
MarshalTraces(traces pdata.Traces) ([]byte, error)

// MarshalMetrics serializes metrics into a byte array
MarshalMetrics(metrics pdata.Metrics) ([]byte, error)

Expand Down
8 changes: 4 additions & 4 deletions exporter/kinesisexporter/otlp_marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type otlpProtoMarshaller struct {

var _ Marshaller = (*otlpProtoMarshaller)(nil)

func (m *otlpProtoMarshaller) Encoding() string {
return otlpProto
}

func (m *otlpProtoMarshaller) MarshalTraces(traces pdata.Traces) ([]byte, error) {
return traces.ToOtlpProtoBytes()
}

func (m *otlpProtoMarshaller) MarshalMetrics(metrics pdata.Metrics) ([]byte, error) {
return metrics.ToOtlpProtoBytes()
}

func (m *otlpProtoMarshaller) Encoding() string {
return otlpProto
}
Loading