Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/awscloudwatchlogsexporter] Enable logGroup and logStream templating #12179

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
024de79
#10887
boostchicken Jul 8, 2022
2c073c0
Updating to replace.
boostchicken Jul 8, 2022
613f6b3
Mocking test
boostchicken Jul 8, 2022
67b4ef6
Fixing gocritic
boostchicken Jul 8, 2022
6599ff0
cleanup
boostchicken Jul 8, 2022
19a040b
Merge branch 'aws_pusher' of https://github.com/boostchicken/opentele…
boostchicken Jul 8, 2022
181c266
Fixing import
boostchicken Jul 8, 2022
81ef060
Merge branch 'main' into aws_pusher
boostchicken Jul 8, 2022
8cc69b6
Reducing duplicate code, light refactor around cwlogs subsitutions
boostchicken Jul 8, 2022
7192952
Format cleanup
boostchicken Jul 8, 2022
201c477
Addressing feedback
boostchicken Jul 12, 2022
899b454
Addressing feedback
boostchicken Jul 12, 2022
b7c6f38
Refactored to Pusher Cache
boostchicken Jul 12, 2022
254dd3a
Removing un-needed flush
boostchicken Jul 12, 2022
d481678
Merge branch 'main' into aws_pusher
boostchicken Jul 12, 2022
25dd1d6
Resolving conflicts.
boostchicken Jul 12, 2022
caac05d
Fixing nil reference.
boostchicken Jul 12, 2022
24734d4
Reducing duplicate code, light refactor around cwlogs subsitutions
boostchicken Jul 12, 2022
9399441
Mocks and providing initalizer interface
boostchicken Jul 12, 2022
190c09b
adding flush
boostchicken Jul 14, 2022
9986d18
fixing unit test
boostchicken Jul 14, 2022
41380b8
Cleaning up some code
boostchicken Jul 14, 2022
6f22e67
Fixing shutdown
boostchicken Jul 14, 2022
2eab639
Moving to jsoniter
boostchicken Jul 14, 2022
93912c7
Fixing unit test
boostchicken Jul 14, 2022
aad5565
format fix
boostchicken Jul 14, 2022
82d3cea
Merge branch 'main' into aws_pusher
boostchicken Jul 14, 2022
4eb24cf
Merge branch 'open-telemetry:main' into aws_pusher
boostchicken Jul 14, 2022
85b4a7b
Merge remote-tracking branch 'origin/aws_pusher' into aws_pusher
boostchicken Jul 14, 2022
a9c7d90
format fix
boostchicken Jul 15, 2022
a5ead18
Unit test fix
boostchicken Jul 15, 2022
de9ece4
removing export
boostchicken Jul 18, 2022
9411dcd
removing export
boostchicken Jul 18, 2022
c34d5e1
Merge branch 'open-telemetry:main' into aws_pusher
boostchicken Jul 18, 2022
8c634d9
performance fix for flush
boostchicken Jul 18, 2022
bb2a067
fixing interfaces
boostchicken Jul 18, 2022
44d4cca
Fixing unit tests and making public interface
boostchicken Jul 18, 2022
e4eaef4
Fixing unit tests and making public interface
boostchicken Jul 18, 2022
97a5ac1
Fixing unit tests and making public interface
boostchicken Jul 18, 2022
d92a4ae
removing export
boostchicken Jul 18, 2022
2e96194
fixing unit test
boostchicken Jul 18, 2022
f058913
removing jsoniter since its not deterministic
boostchicken Jul 18, 2022
e35fe5f
Merge branch 'main' into aws_pusher
boostchicken Aug 5, 2022
3b4a628
Merge branch 'main' into aws_pusher
boostchicken Aug 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions exporter/awscloudwatchlogsexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@

AWS CloudWatch Logs Exporter sends logs data to AWS [CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html).
AWS credentials are retrieved from the [default credential chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials).
Region must be configured in the configuration if not set in the default credential chain.

NOTE: OpenTelemetry Logging support is experimental, hence this exporter is subject to change.

## Configuration

The following settings are required:

- `log_group_name`: The group name of the CloudWatch logs.
- `log_stream_name`: The stream name of the CloudWatch logs.
| Name | Description | Default |
|:---------------------------------------------| :--------------------------------------------------------------------- | ------- |
| `log_group_name` | Customized log group name which supports `{ClusterName}` and `{TaskId}` placeholders. One valid example is `/aws/metrics/{ClusterName}`. It will search for `ClusterName` (or `aws.ecs.cluster.name`) resource attribute in the metrics data and replace with the actual cluster name. If none of them are found in the resource attribute map, `{ClusterName}` will be replaced by `undefined`. Similar way, for the `{TaskId}`, it searches for `TaskId` (or `aws.ecs.task.id`) key in the resource attribute map. For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`) |
| `log_stream_name` | Customized log stream name which supports `{TaskId}`, `{ClusterName}`, `{NodeName}`, `{ContainerInstanceId}`, and `{TaskDefinitionFamily}` placeholders. One valid example is `{TaskId}`. It will search for `TaskId` (or `aws.ecs.task.id`) resource attribute in the metrics data and replace with the actual task id. If none of them are found in the resource attribute map, `{TaskId}` will be replaced by `undefined`. Similarly, for the `{TaskDefinitionFamily}`, it searches for `TaskDefinitionFamily` (or `aws.ecs.task.family`). For the `{ClusterName}`, it searches for `ClusterName` (or `aws.ecs.cluster.name`). For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`). For `{ContainerInstanceId}`, it searches for `ContainerInstanceId` (or `aws.ecs.container.instance.id`). (Note: ContainerInstanceId (or `aws.ecs.container.instance.id`) only works for AWS ECS EC2 launch type. |

The following settings can be optionally configured:

- `region`: The AWS region where the log stream is in.
Optional:
- `region`: The AWS region where the log stream is in. Region must be configured in the configuration if not set in the default credential chain.
- `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list.

### Examples
Expand All @@ -31,17 +31,17 @@ Simplest configuration:
```yaml
exporters:
awscloudwatchlogs:
log_group_name: "testing-logs"
log_stream_name: "testing-integrations-stream"
log_group_name: "testing-logs-{ClusterName}"
log_stream_name: "testing-integrations-stream-{TaskId}"
```

All configuration options:

```yaml
exporters:
awscloudwatchlogs:
log_group_name: "testing-logs"
log_stream_name: "testing-integrations-stream"
log_group_name: "testing-logs-{ClusterName}"
log_stream_name: "testing-integrations-stream-{TaskId}"
region: "us-east-1"
endpoint: "logs.us-east-1.amazonaws.com"
sending_queue:
Expand All @@ -52,4 +52,4 @@ exporters:
```

[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
2 changes: 0 additions & 2 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,3 @@ func (config *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
QueueSize: config.QueueSettings.QueueSize,
}
}

// TODO(jbd): Add ARN role to config.
37 changes: 21 additions & 16 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type exporter struct {
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
pusher cwlogs.Pusher
pusherCache cwlogs.PusherCache
}

func newCwLogsPusher(expConfig *Config, params component.ExporterCreateSettings) (component.LogsExporter, error) {
Expand All @@ -65,15 +65,13 @@ func newCwLogsPusher(expConfig *Config, params component.ExporterCreateSettings)
return nil, err
}

pusher := cwlogs.NewPusher(aws.String(expConfig.LogGroupName), aws.String(expConfig.LogStreamName), *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)

logsExporter := &exporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusher: pusher,
boostchicken marked this conversation as resolved.
Show resolved Hide resolved
pusherCache: cwlogs.NewDefaultPusherCache(params.Logger),
}
return logsExporter, nil
}
Expand All @@ -88,14 +86,15 @@ func newCwLogsExporter(config config.Exporter, params component.ExporterCreateSe
config,
params,
logsExporter.ConsumeLogs,
exporterhelper.WithShutdown(logsExporter.Shutdown),
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithRetry(expConfig.RetrySettings),
)

}

func (e *exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
boostchicken marked this conversation as resolved.
Show resolved Hide resolved
cwLogsPusher := e.pusher

logEvents, _ := logsToCWLogs(e.logger, ld)
if len(logEvents) == 0 {
return nil
Expand All @@ -106,17 +105,25 @@ func (e *exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
InputLogEvent: logEvent,
GeneratedTime: time.Now(),
}
body := &cwLogBody{}
err := json.Unmarshal([]byte(*logEvent.InputLogEvent.Message), body)
if err != nil {
e.logger.Error("Unable to unmarshal log message", zap.Error(err))
continue
}
logGroup, logStream, _ := getLogInfo(body, e.Config)
boostchicken marked this conversation as resolved.
Show resolved Hide resolved
cwLogsPusher := e.pusherCache.GetPusher(logGroup, logStream, *e.svcStructuredLog, e.retryCount)

e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
err = cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
e.logger.Debug("Log events are successfully put")
}
e.logger.Debug("Log events are successfully put")
flushErr := cwLogsPusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
return flushErr
err := e.pusherCache.Flush()
if err != nil {
e.logger.Error("Failed flushing pusher cache", zap.Error(err))
}
return nil
}
Expand All @@ -125,11 +132,9 @@ func (e *exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *exporter) Shutdown(ctx context.Context) error {
if e.pusher != nil {
e.pusher.ForceFlush()
}
return nil
func (e *exporter) Shutdown(ctx context.Context) (errs error) {
return e.pusherCache.Shutdown(ctx)

}

func (e *exporter) Start(ctx context.Context, host component.Host) error {
Expand Down
46 changes: 40 additions & 6 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package awscloudwatchlogsexporter

import (
"context"
"strconv"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -31,6 +33,30 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

type mockCache struct {
mock.Mock
}

func (p *mockCache) GetPusher(logGroup, logStream string, client cwlogs.Client, retries int) cwlogs.Pusher {
args := p.Called(logGroup, logStream, client, retries)
return args.Get(0).(cwlogs.Pusher)
}

func (p *mockCache) ListPushers() []cwlogs.Pusher {
args := p.Called(nil)
return []cwlogs.Pusher{args.Get(0).(cwlogs.Pusher)}
}

func (p *mockCache) Flush() error {
args := p.Called(nil)
return args.Error(0)
}

func (p *mockCache) Shutdown(ctx context.Context) error {
args := p.Called(nil)
return args.Error(0)
}

type mockPusher struct {
mock.Mock
}
Expand Down Expand Up @@ -239,24 +265,32 @@ func TestConsumeLogs(t *testing.T) {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.LogGroupName = "testGroup"
expCfg.LogStreamName = "testStream"
expCfg.LogGroupName = "{PodName}"
expCfg.LogStreamName = "{PodName}"
expCfg.MaxRetries = 0
exp, err := newCwLogsPusher(expCfg, componenttest.NewNopExporterCreateSettings())
logPusher := new(mockPusher)
logPusher.On("AddLogEntry", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("").Twice()

cache := new(mockCache)
cache.On("GetPusher", mock.Anything, mock.Anything, mock.Anything, 0).Return(logPusher, nil).Once()
cache.On("Shutdown", mock.Anything).Return(nil).Once()
cache.On("Flush", mock.Anything).Return(nil).Once()
exp.(*exporter).pusherCache = cache

assert.Nil(t, err)
assert.NotNil(t, exp)
ld := plog.NewLogs()
r := ld.ResourceLogs().AppendEmpty()
r.Resource().Attributes().UpsertString("hello", "test")
timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
r.Resource().Attributes().UpsertString("pod", timestamp)
logRecords := r.ScopeLogs().AppendEmpty().LogRecords()
logRecords.EnsureCapacity(5)
logRecords.AppendEmpty()
assert.Equal(t, 1, ld.LogRecordCount())

logPusher := new(mockPusher)
logPusher.On("AddLogEntry", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("").Twice()
exp.(*exporter).pusher = logPusher
require.NoError(t, exp.(*exporter).ConsumeLogs(ctx, ld))
require.NoError(t, exp.Shutdown(ctx))
}
Expand Down
46 changes: 46 additions & 0 deletions exporter/awscloudwatchlogsexporter/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awscloudwatchlogsexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter"

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

func attrsString(attrs map[string]interface{}) map[string]string {
out := make(map[string]string, len(attrs))
for k, s := range attrs {
out[k] = fmt.Sprint(s)
}
return out
}

func getLogInfo(rm *cwLogBody, config *Config) (logGroup, logStream string, replaced bool) {
groupReplaced := true
streamReplaced := true

strAttributeMap := attrsString(rm.Resource)

// Override log group/stream if specified in config. However, in this case, customer won't have correlation experience
if len(config.LogGroupName) > 0 {
logGroup, groupReplaced = cwlogs.ReplacePatterns(config.LogGroupName, strAttributeMap, config.logger)
}
if len(config.LogStreamName) > 0 {
logStream, streamReplaced = cwlogs.ReplacePatterns(config.LogStreamName, strAttributeMap, config.logger)
}
replaced = groupReplaced && streamReplaced
return
}
Loading