From 33ca9fe181898fe9a556aaf415cb5f566311ec6f Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Fri, 31 Mar 2023 21:15:17 +0800 Subject: [PATCH] Add support AWS Embedded Metric Format Version 0 --- ...tanza-flatten-resource-and-attribtues.yaml | 16 +++ exporter/awsemfexporter/README.md | 7 +- exporter/awsemfexporter/config.go | 12 +- exporter/awsemfexporter/emf_exporter.go | 76 ++++-------- exporter/awsemfexporter/factory.go | 26 +++- exporter/awsemfexporter/metric_declaration.go | 4 +- exporter/awsemfexporter/metric_translator.go | 47 ++++++- .../awsemfexporter/metric_translator_test.go | 105 +++++++++------- .../testdata/testTranslateCWMetricToEMF.json | 1 - .../transformer/flatten/config_test.go | 47 +++++-- .../operator/transformer/flatten/flatten.go | 46 ++++--- .../transformer/flatten/flatten_test.go | 117 +++++++++++++----- .../transformer/flatten/testdata/config.yaml | 19 ++- 13 files changed, 346 insertions(+), 177 deletions(-) create mode 100644 .chloggen/add-stanza-flatten-resource-and-attribtues.yaml delete mode 100644 exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json diff --git a/.chloggen/add-stanza-flatten-resource-and-attribtues.yaml b/.chloggen/add-stanza-flatten-resource-and-attribtues.yaml new file mode 100644 index 000000000000..41fe0621836e --- /dev/null +++ b/.chloggen/add-stanza-flatten-resource-and-attribtues.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enhancement pkg/stanza/flatten to support resource and attributes + +# One or more tracking issues related to the change +issues: [20448] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index 3bbf632f8e00..663108e50f4b 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -32,12 +32,13 @@ The following exporter configuration parameters are supported. | `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | | `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` | "ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup) | | `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | -| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | -| `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | +| `output_destination` | Specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | +| `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | +| `enable_emf_version_1` | Send metrics to CloudWatchLogs with [Embedded Metric Format version 1 (with "_aws")](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure). Otherwise, sending metrics as Embedded Metric Format version 0 (without "_aws") | `true` | | `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] | | [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | | [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ] | -| `retain_initial_value_of_delta_metric` | This option specifies how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | +| `retain_initial_value_of_delta_metric` | Specify how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | ### metric_declaration A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' labels and metric names. diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index 24ff3b8ebc1f..676489224fd2 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -17,6 +17,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" @@ -79,15 +80,20 @@ type Config struct { // Note that at the moment in order to use this feature the value "kubernetes" must also be added to the ParseJSONEncodedAttributeValues array in order to be used EKSFargateContainerInsightsEnabled bool `mapstructure:"eks_fargate_container_insights_enabled"` - // ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes. + // ResourceToTelemetrySettings is an option for converting resource attrihutes to telemetry attributes. // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"` - // DetailedMetrics is the options for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, + // DetailedMetrics is an option for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, // preserve the quantile's population) DetailedMetrics bool `mapstructure:"detailed_metrics"` + // EnableEMFVersion1 is an option for sending metrics to CloudWatchLogs with Embedded Metric Format version 1 (with "_aws") + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure + // Otherwise, sending metrics as Embedded Metric Format version 0 (without "_aws") + EnableEMFVersion1 bool `mapstructure:"enable_emf_version_1"` + // logger is the Logger used for writing error/warning logs logger *zap.Logger } @@ -102,6 +108,8 @@ type MetricDescriptor struct { Overwrite bool `mapstructure:"overwrite"` } +var _ component.Config = (*Config)(nil) + // Validate filters out invalid metricDeclarations and metricDescriptors func (config *Config) Validate() error { var validDeclarations []*MetricDeclaration diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index aeaa03ad32bb..0ecb5079438d 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -23,18 +23,14 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) const ( @@ -46,8 +42,7 @@ const ( type emfExporter struct { pusherMap map[cwlogs.PusherKey]cwlogs.Pusher svcStructuredLog *cwlogs.Client - config component.Config - logger *zap.Logger + config *Config metricTranslator metricTranslator @@ -56,66 +51,40 @@ type emfExporter struct { collectorID string } -// newEmfPusher func creates an EMF Exporter instance with data push callback func -func newEmfPusher( - config component.Config, - params exporter.CreateSettings, -) (*emfExporter, error) { +// newEmfExporter creates a new exporter using exporterhelper +func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, error) { if config == nil { return nil, errors.New("emf exporter config is nil") } - logger := params.Logger - expConfig := config.(*Config) - expConfig.logger = logger + config.logger = set.Logger // create AWS session - awsConfig, session, err := awsutil.GetAWSConfigSession(logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings) + awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings) if err != nil { return nil, err } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session) - collectorIdentifier, _ := uuid.NewRandom() + svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, session) + collectorIdentifier, err := uuid.NewRandom() + + if err != nil { + return nil, err + } emfExporter := &emfExporter{ svcStructuredLog: svcStructuredLog, config: config, - metricTranslator: newMetricTranslator(*expConfig), + metricTranslator: newMetricTranslator(*config), retryCnt: *awsConfig.MaxRetries, - logger: logger, collectorID: collectorIdentifier.String(), + pusherMap: map[cwlogs.PusherKey]cwlogs.Pusher{}, } - emfExporter.pusherMap = map[cwlogs.PusherKey]cwlogs.Pusher{} return emfExporter, nil } -// newEmfExporter creates a new exporter using exporterhelper -func newEmfExporter( - config component.Config, - set exporter.CreateSettings, -) (exporter.Metrics, error) { - emfPusher, err := newEmfPusher(config, set) - if err != nil { - return nil, err - } - - exporter, err := exporterhelper.NewMetricsExporter( - context.TODO(), - set, - config, - emfPusher.pushMetricsData, - exporterhelper.WithShutdown(emfPusher.shutdown), - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - ) - if err != nil { - return nil, err - } - return resourcetotelemetry.WrapMetricsExporter(config.(*Config).ResourceToTelemetrySettings, exporter), nil -} - func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error { rms := md.ResourceMetrics() labels := map[string]string{} @@ -129,23 +98,22 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e }) } } - emf.logger.Info("Start processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Info("Start processing resource metrics", zap.Any("labels", labels)) groupedMetrics := make(map[interface{}]*groupedMetric) - expConfig := emf.config.(*Config) defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID) - outputDestination := expConfig.OutputDestination + outputDestination := emf.config.OutputDestination for i := 0; i < rms.Len(); i++ { - err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, expConfig) + err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, emf.config) if err != nil { return err } } for _, groupedMetric := range groupedMetrics { - cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig) - putLogEvent := translateCWMetricToEMF(cWMetric, expConfig) + cWMetric := translateGroupedMetricToCWMetric(groupedMetric, emf.config) + putLogEvent := translateCWMetricToEMF(cWMetric, emf.config) // Currently we only support two options for "OutputDestination". if strings.EqualFold(outputDestination, outputDestinationStdout) { fmt.Println(*putLogEvent.InputLogEvent.Message) @@ -176,14 +144,14 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e // TODO now we only have one logPusher, so it's ok to return after first error occurred err := wrapErrorIfBadRequest(returnError) if err != nil { - emf.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) + emf.config.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err)) } return err } } } - emf.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) + emf.config.logger.Info("Finish processing resource metrics", zap.Any("labels", labels)) return nil } @@ -192,7 +160,7 @@ func (emf *emfExporter) getPusher(key cwlogs.PusherKey) cwlogs.Pusher { var ok bool if _, ok = emf.pusherMap[key]; !ok { - emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.logger) + emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) } return emf.pusherMap[key] } @@ -215,7 +183,7 @@ func (emf *emfExporter) shutdown(ctx context.Context) error { if returnError != nil { err := wrapErrorIfBadRequest(returnError) if err != nil { - emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err)) + emf.config.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err)) } } } diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index e73eab57e592..ee070fb8f068 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -19,9 +19,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) const ( @@ -47,6 +49,7 @@ func createDefaultConfig() component.Config { LogStreamName: "", Namespace: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", + EnableEMFVersion1: true, RetainInitialValueOfDeltaMetric: false, OutputDestination: "cloudwatch", logger: zap.NewNop(), @@ -54,11 +57,24 @@ func createDefaultConfig() component.Config { } // createMetricsExporter creates a metrics exporter based on this config. -func createMetricsExporter(_ context.Context, - params exporter.CreateSettings, - config component.Config) (exporter.Metrics, error) { - +func createMetricsExporter(ctx context.Context, params exporter.CreateSettings, config component.Config) (exporter.Metrics, error) { expCfg := config.(*Config) - return newEmfExporter(expCfg, params) + emfExp, err := newEmfExporter(expCfg, params) + if err != nil { + return nil, err + } + + exporter, err := exporterhelper.NewMetricsExporter( + ctx, + params, + config, + emfExp.pushMetricsData, + exporterhelper.WithShutdown(emfExp.shutdown), + ) + if err != nil { + return nil, err + } + + return resourcetotelemetry.WrapMetricsExporter(expCfg.ResourceToTelemetrySettings, exporter), nil } diff --git a/exporter/awsemfexporter/metric_declaration.go b/exporter/awsemfexporter/metric_declaration.go index af3343e48173..76ec8bdfd27b 100644 --- a/exporter/awsemfexporter/metric_declaration.go +++ b/exporter/awsemfexporter/metric_declaration.go @@ -186,8 +186,8 @@ func (lm *LabelMatcher) init() (err error) { if len(lm.Separator) == 0 { lm.Separator = ";" } - lm.compiledRegex = regexp.MustCompile(lm.Regex) - return + lm.compiledRegex, err = regexp.Compile(lm.Regex) + return err } // Matches returns true if given set of labels matches the LabelMatcher's rules. diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index af705d607add..6d658d7de3e6 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -338,7 +338,6 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf // translateCWMetricToEMF converts CloudWatch Metric format to EMF. func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) *cwlogs.Event { // convert CWMetric into map format for compatible with PLE input - cWMetricMap := make(map[string]interface{}) fieldMap := cWMetric.fields // restore the json objects that are stored as string in attributes @@ -369,12 +368,48 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) *cwlogs.Event { } } - // Create `_aws` section only if there are measurements + // Create EMF metrics if there are measurements + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure if len(cWMetric.measurements) > 0 { - // Create `_aws` section only if there are measurements - cWMetricMap["CloudWatchMetrics"] = cWMetric.measurements - cWMetricMap["Timestamp"] = cWMetric.timestampMs - fieldMap["_aws"] = cWMetricMap + if config.EnableEMFVersion1 { + /* EMF V1 + "Version": 1, + "_aws": { + "CloudWatchMetrics": [ + { + "Namespace": "ECS", + "Dimensions": [ ["ClusterName"] ], + "Metrics": [{"Name": "memcached_commands_total"}] + } + ], + "Timestamp": 1668387032641 + } + */ + fieldMap["Version"] = 1 + fieldMap["_aws"] = map[string]interface{}{ + "CloudWatchMetrics": cWMetric.measurements, + "Timestamp": cWMetric.timestampMs, + } + + } else { + /* EMF V0 + { + "Version": 0, + "CloudWatchMetrics": [ + { + "Namespace": "ECS", + "Dimensions": [ ["ClusterName"] ], + "Metrics": [{"Name": "memcached_commands_total"}] + } + ], + "Timestamp": 1668387032641 + } + */ + fieldMap["Version"] = 0 + fieldMap["Timestamp"] = cWMetric.timestampMs + fieldMap["CloudWatchMetrics"] = cWMetric.measurements + } + } pleMsg, err := json.Marshal(fieldMap) diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 2d47f1a0240b..89e50b12b609 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -576,37 +576,72 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { } func TestTranslateCWMetricToEMF(t *testing.T) { - cwMeasurement := cWMeasurement{ - Namespace: "test-emf", - Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, - Metrics: []map[string]string{{ - "Name": "spanCounter", - "Unit": "Count", - }}, + testCases := map[string]struct { + enableEMFV1 bool + measurements []cWMeasurement + expectedEMFLogEvent string + }{ + "WithMeasurementAndEMFV1": { + enableEMFV1: true, + measurements: []cWMeasurement{{ + Namespace: "test-emf", + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, + Metrics: []map[string]string{{ + "Name": "spanCounter", + "Unit": "Count", + }}, + }}, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Version\":1,\"_aws\":{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"Timestamp\":1596151098037},\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithMeasurementAndEMFV0": { + enableEMFV1: false, + measurements: []cWMeasurement{{ + Namespace: "test-emf", + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, + Metrics: []map[string]string{{ + "Name": "spanCounter", + "Unit": "Count", + }}, + }}, + expectedEMFLogEvent: "{\"CloudWatchMetrics\":[{\"Namespace\":\"test-emf\",\"Dimensions\":[[\"OTelLib\"],[\"OTelLib\",\"spanName\"]],\"Metrics\":[{\"Name\":\"spanCounter\",\"Unit\":\"Count\"}]}],\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"Timestamp\":1596151098037,\"Version\":0,\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, + "WithNoMeasurement": { + enableEMFV1: true, + measurements: nil, + expectedEMFLogEvent: "{\"OTelLib\":\"cloudwatch-otel\",\"Sources\":[\"cadvisor\",\"pod\",\"calculated\"],\"kubernetes\":{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]},\"spanCounter\":0,\"spanName\":\"test\"}", + }, } - timestamp := int64(1596151098037) - fields := make(map[string]interface{}) - fields[oTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = 0 - // add stringified json as attribute values - fields["kubernetes"] = "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}" - fields["Sources"] = "[\"cadvisor\",\"pod\",\"calculated\"]" - config := &Config{ - // include valid json string, a non-existing key, and keys whose value are not json/string - ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"}, - logger: zap.NewNop(), - } + for name, tc := range testCases { + t.Run(name, func(_ *testing.T) { + config := &Config{ - met := &cWMetrics{ - timestampMs: timestamp, - fields: fields, - measurements: []cWMeasurement{cwMeasurement}, + // include valid json string, a non-existing key, and keys whose value are not json/string + ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"}, + EnableEMFVersion1: tc.enableEMFV1, + logger: zap.NewNop(), + } + + fields := map[string]interface{}{ + oTellibDimensionKey: "cloudwatch-otel", + "spanName": "test", + "spanCounter": 0, + "kubernetes": "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}", + "Sources": "[\"cadvisor\",\"pod\",\"calculated\"]", + } + + cloudwatchMetric := &cWMetrics{ + timestampMs: int64(1596151098037), + fields: fields, + measurements: tc.measurements, + } + + emfLogEvent := translateCWMetricToEMF(cloudwatchMetric, config) + + assert.Equal(t, tc.expectedEMFLogEvent, *emfLogEvent.InputLogEvent.Message) + }) } - inputLogEvent := translateCWMetricToEMF(met, config) - assert.Equal(t, readFromFile("testdata/testTranslateCWMetricToEMF.json"), *inputLogEvent.InputLogEvent.Message, "Expect to be equal") } func TestTranslateGroupedMetricToCWMetric(t *testing.T) { @@ -2114,24 +2149,6 @@ func TestGroupedMetricToCWMeasurementsWithFilters(t *testing.T) { } } -func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) { - timestamp := int64(1596151098037) - fields := make(map[string]interface{}) - fields[oTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = 0 - - met := &cWMetrics{ - timestampMs: timestamp, - fields: fields, - measurements: nil, - } - inputLogEvent := translateCWMetricToEMF(met, &Config{}) - expected := "{\"OTelLib\":\"cloudwatch-otel\",\"spanCounter\":0,\"spanName\":\"test\"}" - - assert.Equal(t, expected, *inputLogEvent.InputLogEvent.Message) -} - func BenchmarkTranslateOtToGroupedMetricWithInstrLibrary(b *testing.B) { oc := createMetricTestData() rm := internaldata.OCToMetrics(oc.Node, oc.Resource, oc.Metrics).ResourceMetrics().At(0) diff --git a/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json b/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json deleted file mode 100644 index c3f4fb9e32fb..000000000000 --- a/exporter/awsemfexporter/testdata/testTranslateCWMetricToEMF.json +++ /dev/null @@ -1 +0,0 @@ -{"OTelLib":"cloudwatch-otel","Sources":["cadvisor","pod","calculated"],"_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"kubernetes":{"container_name":"cloudwatch-agent","docker":{"container_id":"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca"},"host":"ip-192-168-58-245.ec2.internal","labels":{"controller-revision-hash":"5bdbf497dc","name":"cloudwatch-agent","pod-template-generation":"1"},"namespace_name":"amazon-cloudwatch","pod_id":"e23f3413-af2e-4a98-89e0-5df2251e7f05","pod_name":"cloudwatch-agent-26bl6","pod_owners":[{"owner_kind":"DaemonSet","owner_name":"cloudwatch-agent"}]},"spanCounter":0,"spanName":"test"} \ No newline at end of file diff --git a/pkg/stanza/operator/transformer/flatten/config_test.go b/pkg/stanza/operator/transformer/flatten/config_test.go index 45ff14632486..f9d5194ebc9d 100644 --- a/pkg/stanza/operator/transformer/flatten/config_test.go +++ b/pkg/stanza/operator/transformer/flatten/config_test.go @@ -28,37 +28,58 @@ func TestUnmarshal(t *testing.T) { TestsFile: filepath.Join(".", "testdata", "config.yaml"), Tests: []operatortest.ConfigUnmarshalTest{ { - Name: "flatten_one_level", + Name: "flatten_body_one_level", Expect: func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested"}, - } + cfg.Field = entry.NewBodyField("nested") return cfg }(), ExpectErr: false, }, { - Name: "flatten_second_level", + Name: "flatten_body_second_level", Expect: func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested", "secondlevel"}, - } + cfg.Field = entry.NewBodyField("nested", "secondlevel") return cfg }(), ExpectErr: false, }, { - Name: "flatten_attributes", + Name: "flatten_resource_one_level", Expect: func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"attributes", "errField"}, - } + cfg.Field = entry.NewResourceField("nested") return cfg }(), - ExpectErr: true, + ExpectErr: false, + }, + { + Name: "flatten_resource_second_level", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewResourceField("nested", "secondlevel") + return cfg + }(), + ExpectErr: false, + }, + { + Name: "flatten_attributes_one_level", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("nested") + return cfg + }(), + ExpectErr: false, + }, + { + Name: "flatten_attributes_second_level", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("nested", "secondlevel") + return cfg + }(), + ExpectErr: false, }, }, }.Run(t) diff --git a/pkg/stanza/operator/transformer/flatten/flatten.go b/pkg/stanza/operator/transformer/flatten/flatten.go index 749845fdc6f8..12b820c1c9ee 100644 --- a/pkg/stanza/operator/transformer/flatten/flatten.go +++ b/pkg/stanza/operator/transformer/flatten/flatten.go @@ -17,7 +17,6 @@ package flatten // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" "fmt" - "strings" "go.uber.org/zap" @@ -48,7 +47,7 @@ func NewConfigWithID(operatorID string) *Config { // Config is the configuration of a flatten operator type Config struct { helper.TransformerConfig `mapstructure:",squash"` - Field entry.BodyField `mapstructure:"field"` + Field entry.Field `mapstructure:"field"` } // Build will build a Flatten operator from the supplied configuration @@ -58,34 +57,53 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, err } - if strings.Contains(c.Field.String(), "attributes") || strings.Contains(c.Field.String(), "resource") { - return nil, fmt.Errorf("flatten: field cannot be a resource or attribute") + if e, ok := c.Field.FieldInterface.(entry.BodyField); ok { + return &Transformer[entry.BodyField]{ + TransformerOperator: transformerOperator, + Field: e, + }, nil } - return &Transformer{ - TransformerOperator: transformerOperator, - Field: c.Field, - }, nil + if e, ok := c.Field.FieldInterface.(entry.ResourceField); ok { + return &Transformer[entry.ResourceField]{ + TransformerOperator: transformerOperator, + Field: e, + }, nil + } + + if e, ok := c.Field.FieldInterface.(entry.AttributeField); ok { + return &Transformer[entry.AttributeField]{ + TransformerOperator: transformerOperator, + Field: e, + }, nil + } + + return nil, fmt.Errorf("invalid field type: %T", c.Field.FieldInterface) } -// Transformer flattens an object in the body field -type Transformer struct { +// Transformer flattens an object in the entry field +type Transformer[T interface { + entry.BodyField | entry.ResourceField | entry.AttributeField + entry.FieldInterface + Parent() T + Child(string) T +}] struct { helper.TransformerOperator - Field entry.BodyField + Field T } // Process will process an entry with a flatten transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer[T]) Process(ctx context.Context, entry *entry.Entry) error { return p.ProcessWith(ctx, entry, p.Transform) } // Transform will apply the flatten operation to an entry -func (p *Transformer) Transform(entry *entry.Entry) error { +func (p *Transformer[T]) Transform(entry *entry.Entry) error { parent := p.Field.Parent() val, ok := entry.Delete(p.Field) if !ok { // The field doesn't exist, so ignore it - return fmt.Errorf("apply flatten: field %s does not exist on body", p.Field) + return fmt.Errorf("apply flatten: field %s does not exist on entry", p.Field) } valMap, ok := val.(map[string]interface{}) diff --git a/pkg/stanza/operator/transformer/flatten/flatten_test.go b/pkg/stanza/operator/transformer/flatten/flatten_test.go index 8d0211aef7ae..bf748ef6e37e 100644 --- a/pkg/stanza/operator/transformer/flatten/flatten_test.go +++ b/pkg/stanza/operator/transformer/flatten/flatten_test.go @@ -46,6 +46,18 @@ func TestBuildAndProcess(t *testing.T) { "nestedkey": "nestedval", }, } + e.Resource = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + e.Attributes = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } return e } cases := []testCase{ @@ -54,9 +66,7 @@ func TestBuildAndProcess(t *testing.T) { false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested"}, - } + cfg.Field = entry.NewBodyField("nested") return cfg }(), newTestEntry, @@ -74,9 +84,7 @@ func TestBuildAndProcess(t *testing.T) { false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested"}, - } + cfg.Field = entry.NewBodyField("nested") return cfg }(), func() *entry.Entry { @@ -109,9 +117,7 @@ func TestBuildAndProcess(t *testing.T) { false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested", "secondlevel"}, - } + cfg.Field = entry.NewBodyField("nested", "secondlevel") return cfg }(), func() *entry.Entry { @@ -142,9 +148,7 @@ func TestBuildAndProcess(t *testing.T) { false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested", "secondlevel"}, - } + cfg.Field = entry.NewBodyField("nested", "secondlevel") return cfg }(), func() *entry.Entry { @@ -181,9 +185,7 @@ func TestBuildAndProcess(t *testing.T) { false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested"}, - } + cfg.Field = entry.NewBodyField("nested") return cfg }(), func() *entry.Entry { @@ -214,9 +216,7 @@ func TestBuildAndProcess(t *testing.T) { false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"nested"}, - } + cfg.Field = entry.NewBodyField("nested") return cfg }(), func() *entry.Entry { @@ -242,9 +242,7 @@ func TestBuildAndProcess(t *testing.T) { true, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"invalid"}, - } + cfg.Field = entry.NewBodyField("invalid") return cfg }(), newTestEntry, @@ -252,16 +250,77 @@ func TestBuildAndProcess(t *testing.T) { }, { "flatten_resource", - true, + false, func() *Config { cfg := NewConfig() - cfg.Field = entry.BodyField{ - Keys: []string{"resource", "invalid"}, + cfg.Field = entry.NewResourceField("nested", "secondlevel") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "secondlevel": map[string]interface{}{ + "nestedkey1": "nestedval", + "nestedkey2": "nestedval", + "nestedkey3": "nestedval", + "nestedkey4": "nestedval", + }, + }, } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey1": "nestedval", + "nestedkey2": "nestedval", + "nestedkey3": "nestedval", + "nestedkey4": "nestedval", + }, + } + return e + }, + }, + { + "flatten_attributes", + false, + func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("nested", "secondlevel") return cfg }(), - newTestEntry, - nil, + func() *entry.Entry { + e := newTestEntry() + e.Attributes = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "secondlevel": map[string]interface{}{ + "nestedkey1": "nestedval", + "nestedkey2": "nestedval", + "nestedkey3": "nestedval", + "nestedkey4": "nestedval", + }, + }, + } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Attributes = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey1": "nestedval", + "nestedkey2": "nestedval", + "nestedkey3": "nestedval", + "nestedkey4": "nestedval", + }, + } + return e + }, }, } @@ -278,9 +337,11 @@ func TestBuildAndProcess(t *testing.T) { } require.NoError(t, err) - flatten := op.(*Transformer) + flatten := op.(interface { + Process(ctx context.Context, entry *entry.Entry) error + }) fake := testutil.NewFakeOutput(t) - require.NoError(t, flatten.SetOutputs([]operator.Operator{fake})) + require.NoError(t, op.SetOutputs([]operator.Operator{fake})) val := tc.input() err = flatten.Process(context.Background(), val) diff --git a/pkg/stanza/operator/transformer/flatten/testdata/config.yaml b/pkg/stanza/operator/transformer/flatten/testdata/config.yaml index 169014b47262..3f474e92e39c 100644 --- a/pkg/stanza/operator/transformer/flatten/testdata/config.yaml +++ b/pkg/stanza/operator/transformer/flatten/testdata/config.yaml @@ -1,9 +1,18 @@ -flatten_attributes: - type: flatten - field: attributes.errField -flatten_one_level: +flatten_body_one_level: type: flatten field: body.nested -flatten_second_level: +flatten_body_second_level: type: flatten field: body.nested.secondlevel +flatten_resource_one_level: + type: flatten + field: resource.nested +flatten_resource_second_level: + type: flatten + field: resource.nested.secondlevel +flatten_attributes_one_level: + type: flatten + field: attributes.nested +flatten_attributes_second_level: + type: flatten + field: attributes.nested.secondlevel