Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Paramadon committed Dec 11, 2024
1 parent cb1f3df commit 0cd05cb
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 48 deletions.
110 changes: 97 additions & 13 deletions .github/workflows/check_links_config.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,101 @@
{
"ignorePatterns": [
{
"pattern": "http(s)?://\\d+\\.\\d+\\.\\d+\\.\\d+"
"agent": {
"debug": true,
"aws_sdk_log_level": "LogDebugWithHTTPBody",
"region": "us-west-2"
},
"metrics": {
"append_dimensions": {
"AutoScalingGroupName": "${aws:AutoScalingGroupName}",
"ImageId": "${aws:ImageId}",
"InstanceId": "${aws:InstanceId}",
"InstanceType": "${aws:InstanceType}"
},
{
"pattern": "http(s)?://localhost"
},
{
"pattern": "http(s)?://example.com"
},
{
"pattern": "^#"
"metrics_collected": {
"cpu": {
"measurement": [
"cpu_usage_idle",
"cpu_usage_iowait",
"cpu_usage_user",
"cpu_usage_system"
],
"totalcpu": false,
"metrics_collection_interval": 10
},
"disk": {
"resources": [
"*"
],
"measurement": [
"used_percent",
"inodes_free"
],
"metrics_collection_interval": 60
},
"diskio": {
"resources": [
"*"
],
"measurement": [
"io_time",
"write_bytes",
"read_bytes",
"writes",
"reads"
],
"metrics_collection_interval": 60
},
"mem": {
"measurement": [
"mem_used_percent"
],
"metrics_collection_interval": 10
},
"netstat": {
"measurement": [
"tcp_established",
"tcp_time_wait"
],
"metrics_collection_interval": 60
},
"swap": {
"measurement": [
"swap_used_percent"
],
"metrics_collection_interval": 10
},
"ethtool": {
"interface_include": [
"eth0",
"eth1"
],
"metrics_include": [
"bw_in_allowance_exceeded",
"bw_out_allowance_exceeded",
"pps_allowance_exceeded",
"conntrack_allowance_exceeded",
"linklocal_allowance_exceeded"
]
}
}
],
"aliveStatusCodes": [429, 200]
},
"logs": {
"metrics_collected": {
"emf": {
"log_group_name": "/aws/containerinsights/TestCluster/performance",
"log_stream_name": "{container_name}",
"region": "<your-region>"
},
"kubernetes": {
"cluster_name": "TestCluster",
"metrics_collection_interval": 30,
"disable_metric_extraction": true,
"prefer_full_pod_name": true,
"accelerated_compute_metrics": false,
"kueue_container_insights": false
}
},
"force_flush_interval": 5,
"endpoint_override": "https://logs.<your-region>.amazonaws.com"
}
}
34 changes: 12 additions & 22 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
fmt.Println(*putLogEvent.InputLogEvent.Message)
}
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
emfPusher := emf.getPusher(putLogEvent.StreamKey)
emfPusher, err := emf.getPusher(putLogEvent.StreamKey)
if err != nil {
return fmt.Errorf("failed to get pusher: %w", err)
}
if emfPusher != nil {
returnError := emfPusher.AddLogEntry(putLogEvent)
if returnError != nil {
Expand Down Expand Up @@ -154,37 +157,24 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
return nil
}

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
func (emf *emfExporter) getPusher(key cwlogs.StreamKey) (cwlogs.Pusher, error) {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

if emf.svcStructuredLog == nil {
// Initialize svcStructuredLog if it's nil
awsConfig, session, err := awsutil.GetAWSConfigSession(emf.config.logger, &awsutil.Conn{}, &emf.config.AWSSessionSettings)
if err != nil {
emf.set.Logger.Error("Failed to create AWS config and session", zap.Error(err))
return nil
}

// Create CWLogs client with aws session config
emf.svcStructuredLog = cwlogs.NewClient(emf.config.logger,
awsConfig,
emf.set.BuildInfo,
emf.config.LogGroupName,
emf.config.LogRetention,
emf.config.Tags,
session,
cwlogs.WithEnabledContainerInsights(emf.config.IsEnhancedContainerInsights()),
cwlogs.WithEnabledAppSignals(emf.config.IsAppSignalsEnabled()),
)
return nil, errors.New("CloudWatch Logs client not initialized")
}

pusher, exists := emf.pusherMap[key]
if !exists {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.set.Logger)
if emf.set.Logger !=nil{
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.set.Logger)
} else {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
}
emf.pusherMap[key] = pusher
}
return pusher
return pusher, nil
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
Expand Down
36 changes: 34 additions & 2 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
mockHost := &mockHost{}
err = exp.start(ctx, mockHost)
assert.NoError(t, err)

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
Expand Down Expand Up @@ -227,6 +230,9 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
mockHost := &mockHost{}
err = exp.start(ctx, mockHost)
assert.NoError(t, err)

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
Expand Down Expand Up @@ -258,6 +264,9 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
mockHost := &mockHost{}
err = exp.start(ctx, mockHost)
assert.NoError(t, err)

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
Expand Down Expand Up @@ -286,10 +295,26 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
expCfg.MaxRetries = defaultRetryCount
expCfg.LogGroupName = "test-logGroupName"
expCfg.LogStreamName = "{WrongKey}"
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())

// Create a logger
logger, _ := zap.NewProduction()

// Create exporter settings with the logger
settings := exportertest.NewNopCreateSettings()
settings.Logger = logger

exp, err := newEmfExporter(expCfg, settings)
assert.NoError(t, err)
assert.NotNil(t, exp)

exp.config.logger = logger
// Create a mock host
mockHost := &mockHost{}

// Call start
err = exp.start(ctx, mockHost)
assert.NoError(t, err)

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
Expand All @@ -298,7 +323,7 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
"aws.ecs.task.id": "test-task-id",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
Expand All @@ -321,6 +346,13 @@ func TestPushMetricsDataWithErr(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, exp)

// Create a mock host
mockHost := &mockHost{}

// Call start
err = exp.start(ctx, mockHost)
assert.NoError(t, err)

logPusher := new(mockPusher)
logPusher.On("AddLogEntry", nil).Return("some error").Once()
logPusher.On("AddLogEntry", nil).Return("").Twice()
Expand Down
7 changes: 6 additions & 1 deletion exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -60,12 +61,16 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, config
return nil, err
}

// Calling start right after constructor
if err := emfExp.start(ctx, nil); err != nil {
return nil, fmt.Errorf("failed to start EMF exporter: %w", err)
}

exporter, err := exporterhelper.NewMetricsExporter(
ctx,
params,
config,
emfExp.pushMetricsData,
exporterhelper.WithStart(emfExp.start),
exporterhelper.WithShutdown(emfExp.shutdown),
)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/aws/cwlogs/cwlog_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
// Possible exceptions are combination of common errors (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/CommonErrors.html)
// and API specific erros (e.g. https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html#API_PutLogEvents_Errors)
type Client struct {
svc cloudwatchlogsiface.CloudWatchLogsAPI
Svc cloudwatchlogsiface.CloudWatchLogsAPI
logRetention int64
tags map[string]*string
logger *zap.Logger
Expand All @@ -60,7 +60,7 @@ func WithEnabledAppSignals(flag bool) UserAgentOption {

// Create a log client based on the actual cloudwatch logs client.
func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetention int64, tags map[string]*string, logger *zap.Logger) *Client {
logClient := &Client{svc: svc,
logClient := &Client{Svc: svc,
logRetention: logRetention,
tags: tags,
logger: logger}
Expand All @@ -87,7 +87,7 @@ func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.Bu
}

func (client *Client) Handlers() *request.Handlers {
return &client.svc.(*cloudwatchlogs.CloudWatchLogs).Handlers
return &client.Svc.(*cloudwatchlogs.CloudWatchLogs).Handlers
}

// PutLogEvents mainly handles different possible error could be returned from server side, and retries them
Expand All @@ -102,7 +102,7 @@ func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retr
// Finally InvalidSequenceTokenException and DataAlreadyAcceptedException are
// never returned by the PutLogEvents action.
for i := 0; i <= retryCnt; i++ {
response, err = client.svc.PutLogEvents(input)
response, err = client.Svc.PutLogEvents(input)
if err != nil {
var awsErr awserr.Error
if !errors.As(err, &awsErr) {
Expand Down Expand Up @@ -167,7 +167,7 @@ func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retr
// Prepare the readiness for the log group and log stream.
func (client *Client) CreateStream(logGroup, streamName *string) error {
// CreateLogStream / CreateLogGroup
_, err := client.svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
_, err := client.Svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: logGroup,
LogStreamName: streamName,
})
Expand All @@ -176,14 +176,14 @@ func (client *Client) CreateStream(logGroup, streamName *string) error {
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
// Create Log Group with tags if they exist and were specified in the config
_, err = client.svc.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
_, err = client.Svc.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: logGroup,
Tags: client.tags,
})
if err == nil {
// For newly created log groups, set the log retention polic if specified or non-zero. Otheriwse, set to Never Expire
if client.logRetention != 0 {
_, err = client.svc.PutRetentionPolicy(&cloudwatchlogs.PutRetentionPolicyInput{LogGroupName: logGroup, RetentionInDays: &client.logRetention})
_, err = client.Svc.PutRetentionPolicy(&cloudwatchlogs.PutRetentionPolicyInput{LogGroupName: logGroup, RetentionInDays: &client.logRetention})
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
Expand All @@ -192,7 +192,7 @@ func (client *Client) CreateStream(logGroup, streamName *string) error {
}
}
}
_, err = client.svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
_, err = client.Svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: logGroup,
LogStreamName: streamName,
})
Expand Down
5 changes: 5 additions & 0 deletions internal/aws/cwlogs/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ func (p *logPusher) pushEventBatch(req any) error {
// The log events in the batch must be in chronological ordered by their
// timestamp (the time the event occurred, expressed as the number of milliseconds
// since Jan 1, 1970 00:00:00 UTC).
if p.logger == nil {
// If logger is nil, create a no-op logger or return an error
return errors.New("logger is not initialized")
}

logEventBatch := req.(*eventBatch)
logEventBatch.sortLogEvents()
putLogEventsInput := logEventBatch.putLogEventsInput
Expand Down
4 changes: 2 additions & 2 deletions internal/aws/cwlogs/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestAddLogEventWithValidation(t *testing.T) {

func TestStreamManager(t *testing.T) {
svc := newAlwaysPassMockLogClient(func(_ mock.Arguments) {})
mockCwAPI := svc.svc.(*mockCloudWatchLogsClient)
mockCwAPI := svc.Svc.(*mockCloudWatchLogsClient)
manager := NewLogStreamManager(*svc)

// Verify that the stream is created in the first time
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestMultiStreamPusher(t *testing.T) {
input := args.Get(0).(*cloudwatchlogs.PutLogEventsInput)
inputs = append(inputs, input)
})
mockCwAPI := svc.svc.(*mockCloudWatchLogsClient)
mockCwAPI := svc.Svc.(*mockCloudWatchLogsClient)
manager := NewLogStreamManager(*svc)
zap := zap.NewNop()
pusher := newMultiStreamPusher(manager, *svc, zap)
Expand Down

0 comments on commit 0cd05cb

Please sign in to comment.