forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Modify kinesisexporter to support metrics/traces to otlp_proto encodi…
…ng (#9) * Add OTLP marshalling support for kinesis exporter (#1) Adding a Marshaller interface that allows encoding to different export formats Add an OTLP marshaller with support for metrics Co-authored-by: Iris Grace Endozo <[email protected]> * Swapped kinesis opencensus producer with omnition producer * Moved AWS/Kinesis setup to exporter.go * Delete unused Message struct (#3) Co-authored-by: Iris Grace Endozo <[email protected]> * Integrated marshaller into trace path * Adding tests for exporter and trace marshaling * Support sending out metrics to kinesis (#5) Co-authored-by: Iris Grace Endozo <[email protected]> * Added Start function to factory and logging of producer failures * Removed var * Changed order of config variables * Increase test coverage + some cleanups (#8) * Add tests + private methods * Add comments + fixes Co-authored-by: Iris Grace Endozo <[email protected]> * Add README.mdgst * Added parallel testing and import order fixes * Added pointer receivers * Opentelemetry to OpenTelemetry * Unused imports * Add license and fix import order * Fixed license * Fix linting shadowing error * Change readme descriptions * Use sensible values in example config * Added context validation in exporter * Move invalid context as const Co-authored-by: Iris Grace Endozo <[email protected]> Co-authored-by: Raymond Wang <[email protected]>
- Loading branch information
1 parent
8d6f04c
commit 2c33fbb
Showing
16 changed files
with
644 additions
and
118 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,48 @@ | ||
# Kinesis Exporter | ||
|
||
To be added. | ||
Kinesis exporter exports OpenTelemetry data to Kinesis. This exporter uses a [KPL][kpl-url]-like batch producer and uses | ||
the same aggregation format that KPLs use. Message payload encoding is configurable. | ||
|
||
The following settings can be optionally configured: | ||
- `aws` contains AWS specific configuration | ||
- `stream_name` (default = test-stream): The name of the Kinesis stream where events are sent/pushed | ||
- `kinesis_endpoint`: The Kinesis endpoint if role is not being assumed | ||
- `region` (default = us-west-2): The AWS region where the Kinesis stream is defined | ||
- `role`: The Kinesis role to assume | ||
- `kpl` contains kinesis producer library related config to controls things like aggregation, batching, connections, retries, etc | ||
- `aggregate_batch_count` (default = 4294967295): Determines the maximum number of items to pack into an aggregated record. Must not exceed 4294967295 | ||
- `aggregate_batch_size` (default = 51200): Determines the maximum number of bytes to pack into an aggregated record. User records larger than this will bypass aggregation | ||
- `batch_size` (default = 5242880): Determines the maximum number of bytes to send with a PutRecords request. Must not exceed 5MiB | ||
- `batch_count` (default = 1000): Determines the maximum number of items to pack in the batch. Must not exceed 1000 | ||
- `backlog_count` (default = 2000): Determines the channel capacity before Put() will begin blocking. Default to `BatchCount` | ||
- `flush_interval_seconds` (default = 5): The regular interval for flushing the kinesis producer buffer | ||
- `max_connections` (default = 24): Number of requests to send concurrently | ||
- `max_retries` (default = 10): Number of retry attempts to make before dropping records | ||
- `max_backoff_seconds` (default = 60): Maximum time to backoff. Must be greater than 1s | ||
- `encoding` (default = otlp_proto): The encoding of the payload sent to Kinesis. Available encodings: | ||
- `otlp_proto`: the payload is serialized to otlp proto bytes | ||
|
||
Example configuration: | ||
|
||
```yaml | ||
exporters: | ||
kinesis: | ||
encoding: "otlp_proto" | ||
aws: | ||
stream_name: test-stream | ||
region: mars-1 | ||
role: arn:test-role | ||
kinesis_endpoint: kinesis.mars-1.aws.galactic | ||
kpl: | ||
aggregate_batch_count: 4294967295 | ||
aggregate_batch_size: 51200 | ||
batch_size: 5242880 | ||
batch_count: 1000 | ||
backlog_count: 2000 | ||
flush_interval_seconds: 5 | ||
max_connections: 24 | ||
max_retries: 10 | ||
max_backoff_seconds: 60 | ||
``` | ||
[kpl-url]: https://github.com/awslabs/amazon-kinesis-producer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Copyright 2019 OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package kinesisexporter | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/consumer/pdata" | ||
"go.uber.org/zap/zaptest" | ||
) | ||
|
||
type producerMock struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *producerMock) start() { | ||
m.Called() | ||
} | ||
|
||
func (m *producerMock) stop() { | ||
m.Called() | ||
} | ||
|
||
func (m *producerMock) put(data []byte, partitionKey string) error { | ||
args := m.Called(data, partitionKey) | ||
return args.Error(0) | ||
} | ||
|
||
func TestNewKinesisExporter(t *testing.T) { | ||
t.Parallel() | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg) | ||
|
||
exp, err := newExporter(cfg, zaptest.NewLogger(t)) | ||
assert.NotNil(t, exp) | ||
assert.NoError(t, err) | ||
} | ||
|
||
func TestNewKinesisExporterBadEncoding(t *testing.T) { | ||
t.Parallel() | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg) | ||
cfg.Encoding = "" | ||
|
||
exp, err := newExporter(cfg, zaptest.NewLogger(t)) | ||
assert.Nil(t, exp) | ||
assert.Errorf(t, err, "unrecognized encoding") | ||
} | ||
|
||
func TestPushingTracesToKinesisQueue(t *testing.T) { | ||
t.Parallel() | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg) | ||
|
||
exp, _ := newExporter(cfg, zaptest.NewLogger(t)) | ||
mockProducer := new(producerMock) | ||
exp.producer = mockProducer | ||
require.NotNil(t, exp) | ||
|
||
mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil) | ||
|
||
dropped, err := exp.pushTraces(context.Background(), pdata.NewTraces()) | ||
require.NoError(t, err) | ||
require.Equal(t, 0, dropped) | ||
} | ||
|
||
func TestErrorPushingTracesToKinesisQueue(t *testing.T) { | ||
t.Parallel() | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg) | ||
|
||
exp, _ := newExporter(cfg, zaptest.NewLogger(t)) | ||
mockProducer := new(producerMock) | ||
exp.producer = mockProducer | ||
require.NotNil(t, exp) | ||
|
||
mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror")) | ||
|
||
_, err := exp.pushTraces(context.Background(), pdata.NewTraces()) | ||
require.Error(t, err) | ||
} | ||
|
||
func TestPushingMetricsToKinesisQueue(t *testing.T) { | ||
t.Parallel() | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg) | ||
|
||
exp, _ := newExporter(cfg, zaptest.NewLogger(t)) | ||
mockProducer := new(producerMock) | ||
exp.producer = mockProducer | ||
require.NotNil(t, exp) | ||
|
||
mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(nil) | ||
|
||
dropped, err := exp.pushMetrics(context.Background(), pdata.NewMetrics()) | ||
require.NoError(t, err) | ||
require.Equal(t, 0, dropped) | ||
} | ||
|
||
func TestErrorPushingMetricsToKinesisQueue(t *testing.T) { | ||
t.Parallel() | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg) | ||
|
||
exp, _ := newExporter(cfg, zaptest.NewLogger(t)) | ||
mockProducer := new(producerMock) | ||
exp.producer = mockProducer | ||
require.NotNil(t, exp) | ||
|
||
mockProducer.On("put", mock.Anything, mock.AnythingOfType("string")).Return(fmt.Errorf("someerror")) | ||
|
||
_, err := exp.pushMetrics(context.Background(), pdata.NewMetrics()) | ||
require.Error(t, err) | ||
} |
Oops, something went wrong.