Skip to content

Commit

Permalink
Add tests + private methods
Browse files Browse the repository at this point in the history
  • Loading branch information
irisgve committed Oct 14, 2020
1 parent 25a0afd commit 7028067
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 71 deletions.
78 changes: 23 additions & 55 deletions exporter/kinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,111 +17,79 @@ package kinesisexporter
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/google/uuid"
producer "github.com/signalfx/omnition-kinesis-producer"
kpzap "github.com/signalfx/omnition-kinesis-producer/loggers/kpzap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

// Exporter implements an OpenTelemetry exporter that exports all traces/metrics to AWS Kinesis
type Exporter struct {
producer *producer.Producer
// exporter implements an OpenTelemetry exporter that pushes OpenTelemetry data to AWS Kinesis
type exporter struct {
producer producer
logger *zap.Logger
marshaller Marshaller
}

// newKinesisExporter creates a new Exporter with the passed in configurations.
// newExporter creates a new exporter with the passed in configurations.
// It starts the AWS session and setups the relevant connections.
func newKinesisExporter(c *Config, logger *zap.Logger) (*Exporter, error) {
func newExporter(c *Config, logger *zap.Logger) (*exporter, error) {
// Get marshaller based on config
marshaller := defaultMarshallers()[c.Encoding]
if marshaller == nil {
return nil, fmt.Errorf("unrecognized encoding")
}

awsConfig := aws.NewConfig().WithRegion(c.AWS.Region).WithEndpoint(c.AWS.KinesisEndpoint)
sess, err := session.NewSession(awsConfig)
producer, err := newKinesisProducer(c, logger)
if err != nil {
return nil, err
}
client := kinesis.New(sess)

pr := producer.New(&producer.Config{
Logger: &kpzap.Logger{Logger: logger},
Client: client,
StreamName: c.AWS.StreamName,
// KPL parameters
FlushInterval: time.Duration(c.KPL.FlushIntervalSeconds) * time.Second,
BatchCount: c.KPL.BatchCount,
BatchSize: c.KPL.BatchSize,
AggregateBatchCount: c.KPL.AggregateBatchCount,
AggregateBatchSize: c.KPL.AggregateBatchSize,
BacklogCount: c.KPL.BacklogCount,
MaxConnections: c.KPL.MaxConnections,
MaxRetries: c.KPL.MaxRetries,
MaxBackoffTime: time.Duration(c.KPL.MaxBackoffSeconds) * time.Second,
}, nil)
return &Exporter{producer: pr, marshaller: marshaller, logger: logger}, nil
return &exporter{producer: producer, marshaller: marshaller, logger: logger}, nil
}

// Start tells the exporter to start. The exporter may prepare for exporting
// start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
func (e Exporter) Start(_ context.Context, _ component.Host) error {
e.producer.Start()
go e.notifyErrors()
// with the host after start() has already returned. If error is returned by
// start() then the collector startup will be aborted.
func (e exporter) start(context.Context, component.Host) error {
e.producer.start()
return nil
}

// notifyErrors logs the failures within the kinesis exporter
func (e Exporter) notifyErrors() {
for r := range e.producer.NotifyFailures() {
// Logging error for now, these are normally unrecoverable failures
e.logger.Error("error putting record on kinesis", zap.Error(r.Err))
}
}

// Shutdown is invoked during exporter shutdown.
func (e Exporter) Shutdown(context.Context) error {
e.producer.Stop()
// shutdown is invoked during exporter shutdown
func (e exporter) shutdown(context.Context) error {
e.producer.stop()
return nil
}

// ConsumeTraces receives a span batch and exports it to AWS Kinesis
func (e Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) (int, error) {
func (e exporter) pushTraces(_ context.Context, td pdata.Traces) (int, error) {
pBatches, err := e.marshaller.MarshalTraces(td)
if err != nil {
e.logger.Error("error translating span batch", zap.Error(err))
return td.SpanCount(), consumererror.Permanent(err)
}
err = e.producer.Put(pBatches, uuid.New().String())
if err != nil {

if err = e.producer.put(pBatches, uuid.New().String()); err != nil {
e.logger.Error("error exporting span to kinesis", zap.Error(err))
return td.SpanCount(), err
}

return 0, nil
}

// ConsumeMetrics receives a metrics batch and exports it to AWS Kinesis
func (e Exporter) ConsumeMetrics(_ context.Context, td pdata.Metrics) (int, error) {
func (e exporter) pushMetrics(_ context.Context, td pdata.Metrics) (int, error) {
pBatches, err := e.marshaller.MarshalMetrics(td)
if err != nil {
e.logger.Error("error translating metrics batch", zap.Error(err))
return td.MetricCount(), consumererror.Permanent(err)
}
err = e.producer.Put(pBatches, uuid.New().String())
if err != nil {

if err = e.producer.put(pBatches, uuid.New().String()); err != nil {
e.logger.Error("error exporting metrics to kinesis", zap.Error(err))
return td.MetricCount(), err
}

return 0, nil
}
87 changes: 85 additions & 2 deletions exporter/kinesisexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
package kinesisexporter

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestNewKinesisExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, err := newKinesisExporter(cfg, zap.NewNop())
exp, err := newExporter(cfg, zap.NewNop())
assert.NotNil(t, exp)
assert.NoError(t, err)
}
Expand All @@ -36,8 +40,87 @@ func TestNewKinesisExporterBadEncoding(t *testing.T) {
require.NotNil(t, cfg)
cfg.Encoding = ""

exp, err := newKinesisExporter(cfg, zap.NewNop())
exp, err := newExporter(cfg, zap.NewNop())
assert.Nil(t, exp)
assert.Error(t, err)
assert.Equal(t, err.Error(), "unrecognized encoding")
}

func TestPushingTracesToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil)

dropped, err := exp.pushTraces(context.Background(), pdata.NewTraces())
require.NoError(t, err)
require.Equal(t, 0, dropped)
}

func TestErrorPushingTracesToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror"))

_, err := exp.pushTraces(context.Background(), pdata.NewTraces())
require.Error(t, err)
}

func TestPushingMetricsToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil)

dropped, err := exp.pushMetrics(context.Background(), pdata.NewMetrics())
require.NoError(t, err)
require.Equal(t, 0, dropped)
}

func TestErrorPushingMetricsToKinesisQueue(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg)

exp, _ := newExporter(cfg, zap.NewNop())
mockProducer := new(producerMock)
exp.producer = mockProducer
require.NotNil(t, exp)

mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror"))

_, err := exp.pushMetrics(context.Background(), pdata.NewMetrics())
require.Error(t, err)
}

type producerMock struct {
mock.Mock
}

func (m *producerMock) start() {
m.Called()
}

func (m *producerMock) stop() {
m.Called()
}

func (m *producerMock) put(data []byte, partitionKey string) error {
args := m.Called(data, partitionKey)
return args.Error(0)
}
16 changes: 8 additions & 8 deletions exporter/kinesisexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ func createTraceExporter(
config configmodels.Exporter,
) (component.TraceExporter, error) {
c := config.(*Config)
exp, err := newKinesisExporter(c, params.Logger)
exp, err := newExporter(c, params.Logger)
if err != nil {
return nil, err
}

return exporterhelper.NewTraceExporter(
c,
exp.ConsumeTraces,
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Shutdown))
exp.pushTraces,
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.shutdown))
}

func createMetricsExporter(
Expand All @@ -84,14 +84,14 @@ func createMetricsExporter(
config configmodels.Exporter,
) (component.MetricsExporter, error) {
c := config.(*Config)
exp, err := newKinesisExporter(c, params.Logger)
exp, err := newExporter(c, params.Logger)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
c,
exp.ConsumeMetrics,
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Shutdown))
exp.pushMetrics,
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.shutdown))
}
62 changes: 62 additions & 0 deletions exporter/kinesisexporter/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kinesisexporter

import (
"context"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/config/configcheck"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
assert.Equal(t, cfg.Encoding, defaultEncoding)
}

func TestCreateTracesExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestErrorCreateTracesExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Encoding = ""
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestErrorCreateMetricsExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Encoding = ""
r, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.Error(t, err)
assert.Nil(t, r)
}
3 changes: 1 addition & 2 deletions exporter/kinesisexporter/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// Marshaller marshals traces/metrics into Message array.
// Marshaller marshals Opentelemetry data into byte array
type Marshaller interface {
// MarshalTraces serializes traces into a byte array
MarshalTraces(traces pdata.Traces) ([]byte, error)

// MarshalMetrics serializes metrics into a byte array
MarshalMetrics(metrics pdata.Metrics) ([]byte, error)

Expand Down
8 changes: 4 additions & 4 deletions exporter/kinesisexporter/otlp_marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type otlpProtoMarshaller struct {

var _ Marshaller = (*otlpProtoMarshaller)(nil)

func (m *otlpProtoMarshaller) Encoding() string {
return otlpProto
}

func (m *otlpProtoMarshaller) MarshalTraces(traces pdata.Traces) ([]byte, error) {
return traces.ToOtlpProtoBytes()
}

func (m *otlpProtoMarshaller) MarshalMetrics(metrics pdata.Metrics) ([]byte, error) {
return metrics.ToOtlpProtoBytes()
}

func (m *otlpProtoMarshaller) Encoding() string {
return otlpProto
}
Loading

0 comments on commit 7028067

Please sign in to comment.