diff --git a/.github/workflows/add-codeowners-to-pr.yml b/.github/workflows/add-codeowners-to-pr.yml deleted file mode 100644 index 50a3208b495f..000000000000 --- a/.github/workflows/add-codeowners-to-pr.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: 'Add code owners to a PR' -on: - pull_request_target: - types: - - opened - - synchronize - -jobs: - add-owners-to-pr: - runs-on: ubuntu-latest - if: ${{ github.actor != 'dependabot[bot]' && github.repository_owner == 'open-telemetry' }} - steps: - - uses: actions/checkout@v3 - - - name: Run add-codeowners-to-pr.sh - run: ./.github/workflows/scripts/add-codeowners-to-pr.sh - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - REPO: ${{ github.repository }} - PR: ${{ github.event.number }} diff --git a/.github/workflows/auto-assign-owners.yml b/.github/workflows/auto-assign-owners.yml deleted file mode 100644 index df6d2207d87b..000000000000 --- a/.github/workflows/auto-assign-owners.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: 'Auto Assign' -on: - pull_request_target: - types: [opened, ready_for_review] - -concurrency: - group: ${{ github.workflow }}-${{ github.head_ref }} - cancel-in-progress: true - -jobs: - add-owner: - runs-on: ubuntu-latest - if: ${{ github.actor != 'dependabot[bot]' }} - steps: - - name: run - uses: kentaro-m/auto-assign-action@v1.2.5 - with: - configuration-path: ".github/auto_assign.yml" - repo-token: '${{ secrets.GITHUB_TOKEN }}' diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml deleted file mode 100644 index 1fab9a502ff9..000000000000 --- a/.github/workflows/changelog.yml +++ /dev/null @@ -1,88 +0,0 @@ -# This action requires that any PR targeting the main branch should add a -# yaml file to the ./.chloggen/ directory. If a CHANGELOG entry is not required, -# or if performing maintance on the Changelog, add either \"[chore]\" to the title of -# the pull request or add the \"Skip Changelog\" label to disable this action. - -name: changelog - -on: - pull_request: - types: [opened, synchronize, reopened, labeled, unlabeled] - branches: - - main - -env: - # See: https://github.com/actions/cache/issues/810#issuecomment-1222550359 - # Cache downloads for this workflow consistently run in under 1 minute - SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5 - -concurrency: - group: ${{ github.workflow }}-${{ github.head_ref }} - cancel-in-progress: true - -jobs: - changelog: - runs-on: ubuntu-latest - if: ${{ !contains(github.event.pull_request.labels.*.name, 'dependencies') && !contains(github.event.pull_request.labels.*.name, 'Skip Changelog') && !contains(github.event.pull_request.title, '[chore]')}} - - steps: - - name: Checkout Repo - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - name: Setup Go - uses: actions/setup-go@v3 - with: - go-version: ~1.19.7 - - name: Cache Go - id: go-cache - uses: actions/cache@v3 - with: - path: | - ~/go/bin - ~/go/pkg/mod - key: changelog-${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - - - name: Ensure no changes to the CHANGELOG - run: | - if [[ $(git diff --name-only $(git merge-base origin/main ${{ github.event.pull_request.head.sha }}) ${{ github.event.pull_request.head.sha }} ./CHANGELOG.md) ]] - then - echo "The CHANGELOG should not be directly modified." - echo "Please add a .yaml file to the ./.chloggen/ directory." - echo "See CONTRIBUTING.md for more details." - echo "Alternately, add either \"[chore]\" to the title of the pull request or add the \"Skip Changelog\" label if this job should be skipped." - false - else - echo "The CHANGELOG was not modified." - fi - - - name: Ensure ./.chloggen/*.yaml addition(s) - run: | - if [[ 1 -gt $(git diff --diff-filter=A --name-only $(git merge-base origin/main ${{ github.event.pull_request.head.sha }}) ${{ github.event.pull_request.head.sha }} ./.chloggen | grep -c \\.yaml) ]] - then - echo "No changelog entry was added to the ./.chloggen/ directory." - echo "Please add a .yaml file to the ./.chloggen/ directory." - echo "See CONTRIBUTING.md for more details." - echo "Alternately, add either \"[chore]\" to the title of the pull request or add the \"Skip Changelog\" label if this job should be skipped." - false - else - echo "A changelog entry was added to the ./.chloggen/ directory." - fi - - - name: Validate ./.chloggen/*.yaml changes - run: | - make chlog-validate \ - || { echo "New ./.chloggen/*.yaml file failed validation."; exit 1; } - - # In order to validate any links in the yaml file, render the config to markdown - - name: Render .chloggen changelog entries - run: make chlog-preview > changelog_preview.md - - name: Install markdown-link-check - run: npm install -g markdown-link-check - - name: Run markdown-link-check - run: | - markdown-link-check \ - --verbose \ - --config .github/workflows/check_links_config.json \ - changelog_preview.md \ - || { echo "Check that anchor links are lowercase"; exit 1; } diff --git a/.github/workflows/ping-codeowners-prs.yml b/.github/workflows/ping-codeowners-prs.yml deleted file mode 100644 index 51335c8ab264..000000000000 --- a/.github/workflows/ping-codeowners-prs.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: 'Ping code owners on PRs' -on: - pull_request_target: - types: [labeled] - -jobs: - ping-owners: - runs-on: ubuntu-latest - if: ${{ github.actor != 'dependabot[bot]' && github.repository_owner == 'open-telemetry' }} - steps: - - uses: actions/checkout@v3 - - - name: Run ping-codeowners-prs.sh - run: ./.github/workflows/scripts/ping-codeowners-prs.sh - env: - REPO: ${{ github.repository }} - AUTHOR: ${{ github.event.pull_request.user.login }} - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - PR: ${{ github.event.number }} - COMPONENT: ${{ github.event.label.name }} diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index 3bbf632f8e00..02e9e4d77db7 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -32,12 +32,13 @@ The following exporter configuration parameters are supported. | `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | | `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` | "ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup) | | `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | -| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | -| `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | +| `output_destination` | Specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | +| `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | +| `version` | Send metrics to CloudWatchLogs with Embedded Metric Format in selected version [(e.g version 1 with _aws)](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure), version 0 without _aws) | `1` | | `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] | | [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | | [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ] | -| `retain_initial_value_of_delta_metric` | This option specifies how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | +| `retain_initial_value_of_delta_metric` | Specify how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | ### metric_declaration A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' labels and metric names. diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index 24ff3b8ebc1f..be1ac32bb50f 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -17,6 +17,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" @@ -79,15 +80,20 @@ type Config struct { // Note that at the moment in order to use this feature the value "kubernetes" must also be added to the ParseJSONEncodedAttributeValues array in order to be used EKSFargateContainerInsightsEnabled bool `mapstructure:"eks_fargate_container_insights_enabled"` - // ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes. + // ResourceToTelemetrySettings is an option for converting resource attrihutes to telemetry attributes. // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"` - // DetailedMetrics is the options for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, + // DetailedMetrics is an option for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, // preserve the quantile's population) DetailedMetrics bool `mapstructure:"detailed_metrics"` + // Version is an option for sending metrics to CloudWatchLogs with Embedded Metric Format in selected version (with "_aws") + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure + // Otherwise, sending metrics as Embedded Metric Format version 0 (without "_aws") + Version string `mapstructure:"version"` + // logger is the Logger used for writing error/warning logs logger *zap.Logger } @@ -102,6 +108,8 @@ type MetricDescriptor struct { Overwrite bool `mapstructure:"overwrite"` } +var _ component.Config = (*Config)(nil) + // Validate filters out invalid metricDeclarations and metricDescriptors func (config *Config) Validate() error { var validDeclarations []*MetricDeclaration diff --git a/exporter/awsemfexporter/config_test.go b/exporter/awsemfexporter/config_test.go index dc8475c9b8cc..3ddccdc88e39 100644 --- a/exporter/awsemfexporter/config_test.go +++ b/exporter/awsemfexporter/config_test.go @@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", OutputDestination: "cloudwatch", + Version: "1", logger: zap.NewNop(), }, }, @@ -79,6 +80,7 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", OutputDestination: "cloudwatch", + Version: "1", ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, logger: zap.NewNop(), }, @@ -100,6 +102,7 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", OutputDestination: "cloudwatch", + Version: "1", MetricDescriptors: []MetricDescriptor{{ MetricName: "memcached_current_items", Unit: "Count", diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index aeaa03ad32bb..0ecb5079438d 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -23,18 +23,14 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) const ( @@ -46,8 +42,7 @@ const ( type emfExporter struct { pusherMap map[cwlogs.PusherKey]cwlogs.Pusher svcStructuredLog *cwlogs.Client - config component.Config - logger *zap.Logger + config *Config metricTranslator metricTranslator @@ -56,66 +51,40 @@ type emfExporter struct { collectorID string } -// newEmfPusher func creates an EMF Exporter instance with data push callback func -func newEmfPusher( - config component.Config, - params exporter.CreateSettings, -) (*emfExporter, error) { +// newEmfExporter creates a new exporter using exporterhelper +func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, error) { if config == nil { return nil, errors.New("emf exporter config is nil") } - logger := params.Logger - expConfig := config.(*Config) - expConfig.logger = logger + config.logger = set.Logger // create AWS session - awsConfig, session, err := awsutil.GetAWSConfigSession(logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings) + awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings) if err != nil { return nil, err } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session) - collectorIdentifier, _ := uuid.NewRandom() + svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, session) + collectorIdentifier, err := uuid.NewRandom() + + if err != nil { + return nil, err + } emfExporter := &emfExporter{ svcStructuredLog: svcStructuredLog, config: config, - metricTranslator: newMetricTranslator(*expConfig), + metricTranslator: newMetricTranslator(*config), retryCnt: *awsConfig.MaxRetries, - logger: logger, collectorID: collectorIdentifier.String(), + pusherMap: map[cwlogs.PusherKey]cwlogs.Pusher{}, } - emfExporter.pusherMap = map[cwlogs.PusherKey]cwlogs.Pusher{} return emfExporter, nil } -// newEmfExporter creates a new exporter using exporterhelper -func newEmfExporter( - config component.Config, - set exporter.CreateSettings, -) (exporter.Metrics, error) { - emfPusher, err := newEmfPusher(config, set) - if err != nil { - return nil, err - } - - exporter, err := exporterhelper.NewMetricsExporter( - context.TODO(), - set, - config, - emfPusher.pushMetricsData, - exporterhelper.WithShutdown(emfPusher.shutdown), - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - ) - if err != nil { - return nil, err - } - return resourcetotelemetry.WrapMetricsExporter(config.(*Config).ResourceToTelemetrySettings, exporter), nil -} - func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error { rms := md.ResourceMetrics() labels := map[string]string{} @@ -129,23 +98,22 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e }) } } - emf.logger.Info("Start processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Info("Start processing resource metrics", zap.Any("labels", labels)) groupedMetrics := make(map[interface{}]*groupedMetric) - expConfig := emf.config.(*Config) defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID) - outputDestination := expConfig.OutputDestination + outputDestination := emf.config.OutputDestination for i := 0; i < rms.Len(); i++ { - err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, expConfig) + err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, emf.config) if err != nil { return err } } for _, groupedMetric := range groupedMetrics { - cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig) - putLogEvent := translateCWMetricToEMF(cWMetric, expConfig) + cWMetric := translateGroupedMetricToCWMetric(groupedMetric, emf.config) + putLogEvent := translateCWMetricToEMF(cWMetric, emf.config) // Currently we only support two options for "OutputDestination". if strings.EqualFold(outputDestination, outputDestinationStdout) { fmt.Println(*putLogEvent.InputLogEvent.Message) @@ -176,14 +144,14 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e // TODO now we only have one logPusher, so it's ok to return after first error occurred err := wrapErrorIfBadRequest(returnError) if err != nil { - emf.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) + emf.config.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) } return err } } } - emf.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) return nil } @@ -192,7 +160,7 @@ func (emf *emfExporter) getPusher(key cwlogs.PusherKey) cwlogs.Pusher { var ok bool if _, ok = emf.pusherMap[key]; !ok { - emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.logger) + emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) } return emf.pusherMap[key] } @@ -215,7 +183,7 @@ func (emf *emfExporter) shutdown(ctx context.Context) error { if returnError != nil { err := wrapErrorIfBadRequest(returnError) if err != nil { - emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err)) + emf.config.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err)) } } } diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 2674ffade881..8759d44b8eec 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -75,7 +75,7 @@ func TestConsumeMetrics(t *testing.T) { expCfg := factory.CreateDefaultConfig().(*Config) expCfg.Region = "us-west-2" expCfg.MaxRetries = 0 - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -137,7 +137,7 @@ func TestConsumeMetricsWithOutputDestination(t *testing.T) { expCfg.Region = "us-west-2" expCfg.MaxRetries = 0 expCfg.OutputDestination = "stdout" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -198,7 +198,7 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "test-logStreamName" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -268,7 +268,7 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "/aws/ecs/containerinsights/{ClusterName}/performance" expCfg.LogStreamName = "{TaskId}" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -338,7 +338,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "{TaskId}" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -408,7 +408,7 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { expCfg.MaxRetries = defaultRetryCount expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "{WrongKey}" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -478,7 +478,7 @@ func TestPushMetricsDataWithErr(t *testing.T) { expCfg.MaxRetries = 0 expCfg.LogGroupName = "test-logGroupName" expCfg.LogStreamName = "test-logStreamName" - exp, err := newEmfPusher(expCfg, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings()) assert.Nil(t, err) assert.NotNil(t, exp) @@ -550,7 +550,7 @@ func TestNewExporterWithoutConfig(t *testing.T) { settings := exportertest.NewNopCreateSettings() t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake") - exp, err := newEmfPusher(expCfg, settings) + exp, err := newEmfExporter(expCfg, settings) assert.NotNil(t, err) assert.Nil(t, exp) assert.Equal(t, settings.Logger, expCfg.logger) @@ -587,17 +587,16 @@ func TestNewExporterWithMetricDeclarations(t *testing.T) { params := exportertest.NewNopCreateSettings() params.Logger = zap.New(obs) - exp, err := newEmfPusher(expCfg, params) + exp, err := newEmfExporter(expCfg, params) assert.Nil(t, err) assert.NotNil(t, exp) err = expCfg.Validate() assert.Nil(t, err) - config := exp.config.(*Config) // Invalid metric declaration should be filtered out - assert.Equal(t, 3, len(config.MetricDeclarations)) + assert.Equal(t, 3, len(exp.config.MetricDeclarations)) // Invalid dimensions (> 10 dims) should be filtered out - assert.Equal(t, 1, len(config.MetricDeclarations[2].Dimensions)) + assert.Equal(t, 1, len(exp.config.MetricDeclarations[2].Dimensions)) // Test output warning logs expectedLogs := []observer.LoggedEntry{ @@ -615,7 +614,7 @@ func TestNewExporterWithMetricDeclarations(t *testing.T) { } func TestNewExporterWithoutSession(t *testing.T) { - exp, err := newEmfPusher(nil, exportertest.NewNopCreateSettings()) + exp, err := newEmfExporter(nil, exportertest.NewNopCreateSettings()) assert.NotNil(t, err) assert.Nil(t, exp) } @@ -629,7 +628,7 @@ func TestWrapErrorIfBadRequest(t *testing.T) { assert.False(t, consumererror.IsPermanent(err)) } -// This test verifies that if func newEmfPusher() returns an error then newEmfExporter() +// This test verifies that if func newEmfExporter() returns an error then newEmfExporter() // will do so. func TestNewEmfExporterWithoutConfig(t *testing.T) { factory := NewFactory() diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index e73eab57e592..fbcfdd0f3b55 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -19,9 +19,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) const ( @@ -47,6 +49,7 @@ func createDefaultConfig() component.Config { LogStreamName: "", Namespace: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", + Version: "1", RetainInitialValueOfDeltaMetric: false, OutputDestination: "cloudwatch", logger: zap.NewNop(), @@ -54,11 +57,24 @@ func createDefaultConfig() component.Config { } // createMetricsExporter creates a metrics exporter based on this config. -func createMetricsExporter(_ context.Context, - params exporter.CreateSettings, - config component.Config) (exporter.Metrics, error) { - +func createMetricsExporter(ctx context.Context, params exporter.CreateSettings, config component.Config) (exporter.Metrics, error) { expCfg := config.(*Config) - return newEmfExporter(expCfg, params) + emfExp, err := newEmfExporter(expCfg, params) + if err != nil { + return nil, err + } + + exporter, err := exporterhelper.NewMetricsExporter( + ctx, + params, + config, + emfExp.pushMetricsData, + exporterhelper.WithShutdown(emfExp.shutdown), + ) + if err != nil { + return nil, err + } + + return resourcetotelemetry.WrapMetricsExporter(expCfg.ResourceToTelemetrySettings, exporter), nil } diff --git a/exporter/awsemfexporter/metric_declaration.go b/exporter/awsemfexporter/metric_declaration.go index af3343e48173..76ec8bdfd27b 100644 --- a/exporter/awsemfexporter/metric_declaration.go +++ b/exporter/awsemfexporter/metric_declaration.go @@ -186,8 +186,8 @@ func (lm *LabelMatcher) init() (err error) { if len(lm.Separator) == 0 { lm.Separator = ";" } - lm.compiledRegex = regexp.MustCompile(lm.Regex) - return + lm.compiledRegex, err = regexp.Compile(lm.Regex) + return err } // Matches returns true if given set of labels matches the LabelMatcher's rules. diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index af705d607add..840961396ff2 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -338,7 +338,6 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf // translateCWMetricToEMF converts CloudWatch Metric format to EMF. func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) *cwlogs.Event { // convert CWMetric into map format for compatible with PLE input - cWMetricMap := make(map[string]interface{}) fieldMap := cWMetric.fields // restore the json objects that are stored as string in attributes @@ -369,12 +368,48 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) *cwlogs.Event { } } - // Create `_aws` section only if there are measurements + // Create EMF metrics if there are measurements + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure if len(cWMetric.measurements) > 0 { - // Create `_aws` section only if there are measurements - cWMetricMap["CloudWatchMetrics"] = cWMetric.measurements - cWMetricMap["Timestamp"] = cWMetric.timestampMs - fieldMap["_aws"] = cWMetricMap + if config.Version == "1" { + /* EMF V1 + "Version": 1, + "_aws": { + "CloudWatchMetrics": [ + { + "Namespace": "ECS", + "Dimensions": [ ["ClusterName"] ], + "Metrics": [{"Name": "memcached_commands_total"}] + } + ], + "Timestamp": 1668387032641 + } + */ + fieldMap["Version"] = "1" + fieldMap["_aws"] = map[string]interface{}{ + "CloudWatchMetrics": cWMetric.measurements, + "Timestamp": cWMetric.timestampMs, + } + + } else { + /* EMF V0 + { + "Version": 0, + "CloudWatchMetrics": [ + { + "Namespace": "ECS", + "Dimensions": [ ["ClusterName"] ], + "Metrics": [{"Name": "memcached_commands_total"}] + } + ], + "Timestamp": 1668387032641 + } + */ + fieldMap["Version"] = "0" + fieldMap["Timestamp"] = fmt.Sprint(cWMetric.timestampMs) + fieldMap["CloudWatchMetrics"] = cWMetric.measurements + } + } pleMsg, err := json.Marshal(fieldMap) diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 2d47f1a0240b..8020add54ab9 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -16,7 +16,6 @@ package awsemfexporter import ( "fmt" - "os" "reflect" "sort" "testing" @@ -39,15 +38,6 @@ import ( internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" ) -func readFromFile(filename string) string { - data, err := os.ReadFile(filename) - if err != nil { - panic(err) - } - str := string(data) - return str -} - func createMetricTestData() *agentmetricspb.ExportMetricsServiceRequest { request := &agentmetricspb.ExportMetricsServiceRequest{ Node: &commonpb.Node{ @@ -576,37 +566,72 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { } func TestTranslateCWMetricToEMF(t *testing.T) { - cwMeasurement := cWMeasurement{ - Namespace: "test-emf", - Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, - Metrics: []map[string]string{{ - "Name": "spanCounter", - "Unit": "Count", - }}, + testCases := map[string]struct { + emfVersion string + measurements []cWMeasurement + expectedEMFLogEvent string + }{ + "WithMeasurementAndEMFV1": { + emfVersion: "1", + measurements: []cWMeasurement{{ + Namespace: "test-emf", + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, + Metrics: []map[string]string{{ + "Name": "spanCounter", + "Unit": "Count", + }}, + }}, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Version\":1,\"_aws\":{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"Timestamp\":1596151098037},\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithMeasurementAndEMFV0": { + emfVersion: "0", + measurements: []cWMeasurement{{ + Namespace: "test-emf", + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, + Metrics: []map[string]string{{ + "Name": "spanCounter", + "Unit": "Count", + }}, + }}, + expectedEMFLogEvent: "{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Timestamp\":1596151098037,\"Version\":0,\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithNoMeasurement": { + emfVersion: "1", + measurements: nil, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, } - timestamp := int64(1596151098037) - fields := make(map[string]interface{}) - fields[oTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = 0 - // add stringified json as attribute values - fields["kubernetes"] = "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}" - fields["Sources"] = "[\"cadvisor\",\"pod\",\"calculated\"]" - config := &Config{ - // include valid json string, a non-existing key, and keys whose value are not json/string - ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"}, - logger: zap.NewNop(), - } + for name, tc := range testCases { + t.Run(name, func(_ *testing.T) { + config := &Config{ - met := &cWMetrics{ - timestampMs: timestamp, - fields: fields, - measurements: []cWMeasurement{cwMeasurement}, + // include valid json string, a non-existing key, and keys whose value are not json/string + ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"}, + Version: tc.emfVersion, + logger: zap.NewNop(), + } + + fields := map[string]interface{}{ + oTellibDimensionKey: "cloudwatch-otel", + "spanName": "test", + "spanCounter": 0, + "kubernetes": "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}", + "Sources": "[\"cadvisor\",\"pod\",\"calculated\"]", + } + + cloudwatchMetric := &cWMetrics{ + timestampMs: int64(1596151098037), + fields: fields, + measurements: tc.measurements, + } + + emfLogEvent := translateCWMetricToEMF(cloudwatchMetric, config) + + assert.Equal(t, tc.expectedEMFLogEvent, *emfLogEvent.InputLogEvent.Message) + }) } - inputLogEvent := translateCWMetricToEMF(met, config) - assert.Equal(t, readFromFile("testdata/testTranslateCWMetricToEMF.json"), *inputLogEvent.InputLogEvent.Message, "Expect to be equal") } func TestTranslateGroupedMetricToCWMetric(t *testing.T) { @@ -2114,24 +2139,6 @@ func TestGroupedMetricToCWMeasurementsWithFilters(t *testing.T) { } } -func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) { - timestamp := int64(1596151098037) - fields := make(map[string]interface{}) - fields[oTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = 0 - - met := &cWMetrics{ - timestampMs: timestamp, - fields: fields, - measurements: nil, - } - inputLogEvent := translateCWMetricToEMF(met, &Config{}) - expected := "{\"OTelLib\":\"cloudwatch-otel\",\"spanCounter\":0,\"spanName\":\"test\"}" - - assert.Equal(t, expected, *inputLogEvent.InputLogEvent.Message) -} - func BenchmarkTranslateOtToGroupedMetricWithInstrLibrary(b *testing.B) { oc := createMetricTestData() rm := internaldata.OCToMetrics(oc.Node, oc.Resource, oc.Metrics).ResourceMetrics().At(0) diff --git a/exporter/awsemfexporter/testdata/config.yaml b/exporter/awsemfexporter/testdata/config.yaml index e132056e0f34..ffbfc759007e 100644 --- a/exporter/awsemfexporter/testdata/config.yaml +++ b/exporter/awsemfexporter/testdata/config.yaml @@ -2,6 +2,9 @@ awsemf: awsemf/1: region: 'us-west-2' role_arn: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole" + detailed_metrics: false + version: "1" + awsemf/resource_attr_to_label: resource_to_telemetry_conversion: enabled: true diff --git a/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json b/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json deleted file mode 100644 index c3f4fb9e32fb..000000000000 --- a/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json +++ /dev/null @@ -1 +0,0 @@ -{"OTelLib":"cloudwatch-otel","Sources":["cadvisor","pod","calculated"],"_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"kubernetes":{"container_name":"cloudwatch-agent","docker":{"container_id":"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca"},"host":"ip-192-168-58-245.ec2.internal","labels":{"controller-revision-hash":"5bdbf497dc","name":"cloudwatch-agent","pod-template-generation":"1"},"namespace_name":"amazon-cloudwatch","pod_id":"e23f3413-af2e-4a98-89e0-5df2251e7f05","pod_name":"cloudwatch-agent-26bl6","pod_owners":[{"owner_kind":"DaemonSet","owner_name":"cloudwatch-agent"}]},"spanCounter":0,"spanName":"test"} \ No newline at end of file