From b6a351d6ffd00b04aee2ddcb09e8ed17480c1d8e Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Sun, 7 Apr 2024 21:09:18 -0700 Subject: [PATCH 1/2] [discovery] Apply entity events schema the discovery receiver logs This change updates the schema of the logs emitted by the discovery receiver to represent entity events. No information is dropped, but it was reshuffled to adopt for the new schema: - Entity ID is made of one key/value pair moved from `discovery.endpoint.id` resource attribute - All other resource attributes are moved to the entity state event attributes - Log body was moved to the `discovery.event.message` entity state attribute The Discovery config provider is updated to use the entity events. The keys used in the entity ID and entity attributes are not finalized and are expected to be changed in the following PRs. --- CHANGELOG.md | 1 + go.mod | 2 +- internal/common/discovery/discovery.go | 6 + .../confmapprovider/discovery/discoverer.go | 54 ++++-- internal/receiver/discoveryreceiver/README.md | 164 ++++++++---------- .../discoveryreceiver/metric_evaluator.go | 38 ++-- .../metric_evaluator_test.go | 32 ++-- .../discoveryreceiver/statement_evaluator.go | 51 +++--- .../statement_evaluator_test.go | 59 ++++--- .../statussources/statements.go | 25 +-- .../statussources/statements_test.go | 39 +---- ...ost_observer_simple_prometheus_config.yaml | 22 +-- ...t_observer_simple_prometheus_statuses.yaml | 79 +++++---- 13 files changed, 271 insertions(+), 301 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b50d1aa6..8f24fa24bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Remove `severity_text` field from log evaluation statements ([#4583](https://github.com/signalfx/splunk-otel-collector/pull/4583)) - Remove `first_only` field from match struct. Events are always emitted only once for first matching metric or log statement ([#4593](https://github.com/signalfx/splunk-otel-collector/pull/4593)) - Combine matching conditions with different statuses in one list ([#4588](https://github.com/signalfx/splunk-otel-collector/pull/4588)) + - Apply entity events schema to the logs emitted by the receiver ([#4638](https://github.com/signalfx/splunk-otel-collector/pull/4638)) ## v0.97.0 diff --git a/go.mod b/go.mod index 21a5ea7cc0..e7300d790f 100644 --- a/go.mod +++ b/go.mod @@ -530,7 +530,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.97.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.97.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.97.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.97.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.97.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.97.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.97.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.97.0 // indirect diff --git a/internal/common/discovery/discovery.go b/internal/common/discovery/discovery.go index ccaf80c622..bf805f1fc4 100644 --- a/internal/common/discovery/discovery.go +++ b/internal/common/discovery/discovery.go @@ -27,6 +27,12 @@ const ( ReceiverNameAttr = "discovery.receiver.name" ReceiverTypeAttr = "discovery.receiver.type" StatusAttr = "discovery.status" + MessageAttr = "discovery.message" + + OtelEntityAttributesAttr = "otel.entity.attributes" + OtelEntityIDAttr = "otel.entity.id" + OtelEntityEventTypeAttr = "otel.entity.event.type" + OtelEntityEventTypeState = "entity_state" DiscoExtensionsKey = "extensions/splunk.discovery" DiscoReceiversKey = "receivers/splunk.discovery" diff --git a/internal/confmapprovider/discovery/discoverer.go b/internal/confmapprovider/discovery/discoverer.go index 1f7b93f634..8d3899e7e6 100644 --- a/internal/confmapprovider/discovery/discoverer.go +++ b/internal/confmapprovider/discovery/discoverer.go @@ -641,8 +641,8 @@ func (d *discoverer) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } -// ConsumeLogs will walk through all discovery receiver-emitted logs and store all receiver and observer statuses, -// including reported receiver configs from their discovery.receiver.config attribute. It is a consumer.Logs method. +// ConsumeLogs will walk through all discovery receiver-emitted entity events and store all receiver and observer +// statuses, including reported receiver configs from their discovery.receiver.config attribute. It is a consumer.Logs method. func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { if ld.LogRecordCount() == 0 { return nil @@ -660,12 +660,26 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { observerID component.ID err error ) - rlog := rlogs.At(i) - rAttrs := rlog.Resource().Attributes() - if rName, ok := rAttrs.Get(discovery.ReceiverNameAttr); ok { + + // We assume that every resource log has a single log record as per the current implementation of the discovery receiver. + lr := rlogs.At(i).ScopeLogs().At(0).LogRecords().At(0) + + // Only interested in entity events that are of type entity_state. + if entityEventType, ok := lr.Attributes().Get(discovery.OtelEntityEventTypeAttr); !ok || entityEventType.Str() != discovery.OtelEntityEventTypeState { + continue + } + + oea, ok := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) + if !ok { + d.logger.Debug("invalid entity event without attributes", zap.Any("log record", lr)) + continue + } + entityAttrs := oea.Map() + + if rName, hasName := entityAttrs.Get(discovery.ReceiverNameAttr); hasName { receiverName = rName.Str() } - rType, ok := rAttrs.Get(discovery.ReceiverTypeAttr) + rType, ok := entityAttrs.Get(discovery.ReceiverTypeAttr) if !ok { // nothing we can do without this one continue @@ -675,10 +689,11 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { d.logger.Debug("invalid receiver type", zap.Error(err)) continue } - if rConfig, ok := rAttrs.Get(discovery.ReceiverConfigAttr); ok { + if rConfig, hasConfig := entityAttrs.Get(discovery.ReceiverConfigAttr); hasConfig { receiverConfig = rConfig.Str() } - if rObsID, ok := rAttrs.Get(discovery.ObserverIDAttr); ok { + + if rObsID, hasObsID := entityAttrs.Get(discovery.ObserverIDAttr); hasObsID { obsID = rObsID.Str() } @@ -693,8 +708,14 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { } } + entityIDAttr, ok := lr.Attributes().Get(discovery.OtelEntityIDAttr) + if !ok { + d.logger.Debug("invalid entity event without id", zap.Any("log record", lr)) + continue + } + endpointID := "unavailable" - if eid, k := rAttrs.Get(discovery.EndpointIDAttr); k { + if eid, k := entityIDAttr.Map().Get(discovery.EndpointIDAttr); k { endpointID = eid.AsString() } @@ -733,21 +754,24 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { currentReceiverStatus := d.discoveredReceivers[receiverID] currentObserverStatus := d.discoveredObservers[observerID] - // We assume that every resource log has a single log record as per the current implementation of the discovery receiver. - lr := rlog.ScopeLogs().At(0).LogRecords().At(0) if currentReceiverStatus != discovery.Successful || currentObserverStatus != discovery.Successful { - if rStatusAttr, ok := lr.Attributes().Get(discovery.StatusAttr); ok { + if rStatusAttr, ok := entityAttrs.Get(discovery.StatusAttr); ok { rStatus := discovery.StatusType(rStatusAttr.Str()) if valid, e := discovery.IsValidStatus(rStatus); !valid { - d.logger.Debug("invalid status from log record", zap.Error(e), zap.Any("lr", lr.Body().AsRaw())) + d.logger.Debug("invalid status from log record", zap.Error(e), zap.Any("lr", lr)) continue } + var msg string + msgAttr, hasMsg := entityAttrs.Get(discovery.MessageAttr) + if hasMsg { + msg = msgAttr.AsString() + } receiverStatus := determineCurrentStatus(currentReceiverStatus, rStatus) switch receiverStatus { case discovery.Failed: - d.logger.Info(fmt.Sprintf("failed to discover %q using %q endpoint %q: %s", receiverID, observerID, endpointID, lr.Body().AsString())) + d.logger.Info(fmt.Sprintf("failed to discover %q using %q endpoint %q: %s", receiverID, observerID, endpointID, msg)) case discovery.Partial: - fmt.Fprintf(os.Stderr, "Partially discovered %q using %q endpoint %q: %s\n", receiverID, observerID, endpointID, lr.Body().AsString()) + fmt.Fprintf(os.Stderr, "Partially discovered %q using %q endpoint %q: %s\n", receiverID, observerID, endpointID, msg) case discovery.Successful: fmt.Fprintf(os.Stderr, "Successfully discovered %q using %q endpoint %q.\n", receiverID, observerID, endpointID) } diff --git a/internal/receiver/discoveryreceiver/README.md b/internal/receiver/discoveryreceiver/README.md index 2879589398..8484bff6ec 100644 --- a/internal/receiver/discoveryreceiver/README.md +++ b/internal/receiver/discoveryreceiver/README.md @@ -121,57 +121,61 @@ Flags: 0 ## Example Usage The following Collector configuration will create a Discovery receiver instance that receives -endpoints from a [Docker Observer](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/observer/dockerobserver/README.md) -that reports log records denoting the status of a [Smart Agent receiver](https://github.com/signalfx/splunk-otel-collector/blob/main/pkg/receiver/smartagentreceiver/README.md) -configured to use the `collectd/redis` monitor. The `status` mapping comprises entries that signal the Smart Agent -receiver has been instantiated with a `successful`, `partial`, or `failed` status, based on reported `metrics` or -recorded application log `statements`. +endpoints from a [Kubernetes Observer](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/observer/k8sobserver/README.md) +that reports log records denoting the status of a [MySQL receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/mysqlreceiver/README.md). +The `status` mapping comprises entries that signal the receiver has been instantiated with a `successful`, `partial`, +or `failed` status, based on reported `metrics` or recorded application log `statements`. -The Redis receiver warrants the following log record with `discovery.status` from the Discovery receiver: +The following rules are defined for the `mysql` receiver: -* `successful` if it emits any metrics matching the `regexp: .*` pattern, denoting that metric gathering and the -Receiver are functional. -* `partial` if it internally logs a statement matching the `regexp: (WRONGPASS|NOAUTH|ERR AUTH)` pattern, suggesting -there is a Redis server but it's receiving incorrect credentials. -* `failed` if it internally logs a statement matching the `regexp: ConnectionRefusedError` pattern, suggesting that no -Redis server is available at the endpoint. +* `successful` if it emits any `mysql.locks` metrics, denoting that metric gathering and the Receiver are functional. +* `partial` if it internally logs a statement matching the `Access denied for user` pattern, suggesting +there is a MySQL server but it's receiving incorrect credentials. +* `failed` if it internally logs a statement matching the `Can't connect to MySQL server on .* [(]111[)]` pattern, +suggesting that no MySQL server is available at the endpoint. ```yaml extensions: - docker_observer: + k8s_observer: + auth_type: serviceAccount + node: ${K8S_NODE_NAME} receivers: discovery: - watch_observers: [docker_observer] + watch_observers: [k8s_observer] receivers: - smartagent/redis: - // Determine the functionality of the smartagent/redis receiver for any detected container - rule: type == "container" + mysql: + rule: type == "port" and port != 33060 and pod.name matches "(?i)mysql" config: - type: collectd/redis - // the metric or log statement + username: root + password: root status: metrics: - status: successful - regexp: '.*' + strict: mysql.locks log_record: - body: Successfully able to connect to Redis container. + body: Mysql receiver is working! statements: - - status: partial - regexp: (WRONGPASS|NOAUTH|ERR AUTH) - log_record: - body: Container appears to be accepting redis connections but the default auth setting is incorrect. - status: failed - regexp: ConnectionRefusedError + regexp: "Can't connect to MySQL server on .* [(]111[)]" + log_record: + append_pattern: true + body: The container cannot be reached by the Collector. The container is refusing MySQL connections. + - status: partial + regexp: 'Access denied for user' log_record: - body: Container appears to not be accepting redis connections. + append_pattern: true + body: >- + Make sure your user credentials are correctly specified using the + `--set splunk.discovery.receivers.mysql.config.username=""` and + `--set splunk.discovery.receivers.mysql.config.password=""` command or the + `SPLUNK_DISCOVERY_RECEIVERS_mysql_CONFIG_username=""` and + `SPLUNK_DISCOVERY_RECEIVERS_mysql_CONFIG_password=""` environment variables. exporters: debug: verbosity: detailed - sampling_initial: 1 - sampling_thereafter: 1 service: extensions: - - docker_observer + - k8s_observer pipelines: logs: receivers: @@ -180,101 +184,81 @@ service: - logging ``` -Given this configuration, if the Discovery receiver's Docker observer instance reports an active Redis container, and -the Smart Agent receiver's associated `collectd/redis` monitor is able to generate metrics for the container, the -receiver will emit something similar to the following log records: +Given this configuration, if the Discovery receiver's Kubernetes observer instance reports an active MySQL container, and +the `mysql` receiver is able to generate metrics for the container, the receiver will emit the following entity event: ``` -2022-07-27T16:35:03.575Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 1} -2022-07-27T16:35:03.575Z info ResourceLog #0 +2024-04-08T06:08:58.204Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1} +2024-04-08T06:08:58.204Z info ResourceLog #0 Resource SchemaURL: -Resource labels: - -> container.name: STRING(lucid_goldberg) - -> container.image.name: STRING(redis) - -> discovery.receiver.rule: STRING(type == "container" && image == "redis") - -> discovery.endpoint.id: STRING(2f0d88c1d93b2aafce4a725de370005f9e7e961144551a3df37a88b27ebed48f:6379) - -> discovery.receiver.name: STRING(smartagent/redis) - -> event.type: STRING(metric.match) ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope +InstrumentationScope attributes: + -> otel.entity.event_as_log: Bool(true) LogRecord #0 -ObservedTimestamp: 2022-07-27 16:35:03.575234252 +0000 UTC -Timestamp: 2022-07-27 16:35:01.522543616 +0000 UTC -Severity: info -Body: Successfully able to connect to Redis container. +ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC +Timestamp: 2024-04-08 06:08:58.194666193 +0000 UTC +SeverityText: +SeverityNumber: Unspecified(0) +Body: Empty() Attributes: - -> metric.name: STRING(counter.expired_keys) - -> discovery.status: STRING(successful) + -> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/05c6a212-730c-4295-8dd6-0c460c892034/mysql(3306)"}) + -> otel.entity.event.type: Str(entity_state) + -> otel.entity.attributes: Map({"discovery.event.type":"metric.match","discovery.message":"Mysql receiver is working!","discovery.observer.id":"k8s_observer","discovery.receiver.rule":"type == \"port\" and port != 33060 and pod.name matches \"(?i)mysql\"","discovery.receiver.type":"mysql","discovery.status":"successful","k8s.namespace.name":"default","k8s.pod.name":"mysql-0","k8s.pod.uid":"05c6a212-730c-4295-8dd6-0c460c892034","metric.name":"mysql.locks","mysql.instance.endpoint":"192.168.161.105:3306"}) Trace ID: Span ID: Flags: 0 ``` -Instead, if the Docker observer reports an active Redis container but the `collectd/redis` authentication information is +Instead, if the Docker observer reports an active MySQL container but the provided authentication information is incorrect, the Discovery receiver will emit something similar to the following log record: ``` -2022-07-27T17:11:27.271Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 1} -2022-07-27T17:11:27.271Z info ResourceLog #0 +2024-04-08T06:17:36.991Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1} +2024-04-08T06:17:36.992Z info ResourceLog #0 Resource SchemaURL: -Resource labels: - -> event.type: STRING(statement.match) ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope +InstrumentationScope attributes: + -> otel.entity.event_as_log: Bool(true) LogRecord #0 -ObservedTimestamp: 2022-07-27 17:11:27.27149884 +0000 UTC -Timestamp: 2022-07-27 17:11:27.271444488 +0000 UTC -Severity: warn -Body: Container appears to be accepting redis connections but the default auth is incorrect. +ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC +Timestamp: 2024-04-08 06:17:36.991618675 +0000 UTC +SeverityText: +SeverityNumber: Unspecified(0) +Body: Empty() Attributes: - -> runnerPID: STRING(119386) - -> monitorID: STRING(smartagentredisreceiver_creatordiscoveryendpoint1721702637958d3f3335daf83f7) - -> monitorType: STRING(collectd/redis) - -> caller: STRING(signalfx/handler.go:189) - -> name: STRING(smartagent/redis/receiver_creator/discovery{endpoint="172.17.0.2:6379"}(58d3f3335daf83f7)) - -> createdTime: STRING(1.658941887269115e+09) - -> lineno: STRING(201) - -> logger: STRING(root) - -> sourcePath: STRING(/usr/lib/splunk-otel-collector/agent-bundle/collectd-python/redis/redis_info.py) - -> level: STRING(error) - -> receiver.name: STRING(smartagent/redis) - -> discovery.status: STRING(partial) + -> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/05c6a212-730c-4295-8dd6-0c460c892034/mysql(3306)"}) + -> otel.entity.event.type: Str(entity_state) + -> otel.entity.attributes: Map({"caller":"mysqlreceiver@v0.97.0/scraper.go:82","discovery.event.type":"statement.match","discovery.message":"Make sure your user credentials are correctly specified using the `--set splunk.discovery.receivers.mysql.config.username=\"\u003cusername\u003e\"` and `--set splunk.discovery.receivers.mysql.config.password=\"\u003cpassword\u003e\"` command or the `SPLUNK_DISCOVERY_RECEIVERS_mysql_CONFIG_username=\"\u003cusername\u003e\"` and `SPLUNK_DISCOVERY_RECEIVERS_mysql_CONFIG_password=\"\u003cpassword\u003e\"` environment variables. (evaluated \"{\\\"error\\\":\\\"Error 1045 (28000): Access denied for user 'root'@'192.168.174.232' (using password: YES)\\\",\\\"kind\\\":\\\"receiver\\\",\\\"message\\\":\\\"Failed to fetch InnoDB stats\\\"}\")","discovery.observer.id":"k8s_observer","discovery.receiver.name":"","discovery.receiver.rule":"type == \"port\" and port != 33060 and pod.name matches \"(?i)mysql\"","discovery.receiver.type":"mysql","discovery.status":"partial","error":"Error 1045 (28000): Access denied for user 'root'@'192.168.174.232' (using password: YES)","kind":"receiver","name":"mysql//receiver_creator/discovery/logs{endpoint=\"192.168.161.105:3306\"}/k8s_observer/05c6a212-730c-4295-8dd6-0c460c892034/mysql(3306)","stacktrace":"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver.(*mySQLScraper).scrape\n\tgithub.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver@v0.97.0/scraper.go:82\ngo.opentelemetry.io/collector/receiver/scraperhelper.ScrapeFunc.Scrape\n\tgo.opentelemetry.io/collector/receiver@v0.97.0/scraperhelper/scraper.go:20\ngo.opentelemetry.io/collector/receiver/scraperhelper.(*controller).scrapeMetricsAndReport\n\tgo.opentelemetry.io/collector/receiver@v0.97.0/scraperhelper/scrapercontroller.go:194\ngo.opentelemetry.io/collector/receiver/scraperhelper.(*controller).startScraping.func1\n\tgo.opentelemetry.io/collector/receiver@v0.97.0/scraperhelper/scrapercontroller.go:169"}) Trace ID: Span ID: Flags: 0 ``` -If the Docker observer reports an unrelated container that isn't running Redis, the following log record would be emitted: +If the Kubernetes observer reports an unrelated container that isn't running MySQL, the following entity event would be emitted: ``` -2022-07-27T17:16:57.718Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 1} -2022-07-27T17:16:57.719Z info ResourceLog #0 +2024-04-08T07:06:49.502Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1} +2024-04-08T07:06:49.502Z info ResourceLog #0 Resource SchemaURL: -Resource labels: - -> event.type: STRING(statement.match) ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope +InstrumentationScope attributes: + -> otel.entity.event_as_log: Bool(true) LogRecord #0 -ObservedTimestamp: 2022-07-27 17:16:57.718720008 +0000 UTC -Timestamp: 2022-07-27 17:16:57.718670678 +0000 UTC -Severity: debug -Body: Container appears to not be accepting redis connections. +ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC +Timestamp: 2024-04-08 07:06:49.502297226 +0000 UTC +SeverityText: +SeverityNumber: Unspecified(0) +Body: Empty() Attributes: - -> name: STRING(smartagent/redis/receiver_creator/discovery{endpoint="172.17.0.2:80"}(54141aa1da2d2ad0)) - -> monitorType: STRING(collectd/redis) - -> logger: STRING(root) - -> createdTime: STRING(1.6589422177182763e+09) - -> level: STRING(error) - -> monitorID: STRING(smartagentredisreceiver_creatordiscoveryendpoint17217028054141aa1da2d2ad0) - -> runnerPID: STRING(119590) - -> sourcePath: STRING(/usr/lib/splunk-otel-collector/agent-bundle/collectd-python/redis/redis_info.py) - -> lineno: STRING(198) - -> caller: STRING(signalfx/handler.go:189) - -> receiver.name: STRING(smartagent/redis) - -> discovery.status: STRING(failed) + -> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/05c6a212-730c-4295-8dd6-0c460c892034/mysql(3306)"}) + -> otel.entity.event.type: Str(entity_state) + -> otel.entity.attributes: Map({"caller":"receivercreator@v0.97.0/observerhandler.go:96","discovery.event.type":"statement.match","discovery.message":"The container cannot be reached by the Collector. The container is refusing MySQL connections. (evaluated \"{\\\"endpoint\\\":\\\"192.168.161.105:3306\\\",\\\"endpoint_id\\\":\\\"k8s_observer/05c6a212-730c-4295-8dd6-0c460c892034/mysql(3306)\\\",\\\"kind\\\":\\\"receiver\\\",\\\"message\\\":\\\"starting receiver\\\"}\")","discovery.observer.id":"k8s_observer","discovery.receiver.name":"","discovery.receiver.rule":"type == \"port\" and port != 33060 and pod.name matches \"(?i)mysql\"","discovery.receiver.type":"mysql","discovery.status":"failed","endpoint":"192.168.161.105:3306","endpoint_id":"k8s_observer/05c6a212-730c-4295-8dd6-0c460c892034/mysql(3306)","kind":"receiver","name":"mysql"}) Trace ID: Span ID: Flags: 0 diff --git a/internal/receiver/discoveryreceiver/metric_evaluator.go b/internal/receiver/discoveryreceiver/metric_evaluator.go index c1400fccf7..ede116fb44 100644 --- a/internal/receiver/discoveryreceiver/metric_evaluator.go +++ b/internal/receiver/discoveryreceiver/metric_evaluator.go @@ -17,8 +17,8 @@ package discoveryreceiver import ( "context" "fmt" - "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -122,38 +122,40 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { continue } - pLogs := plog.NewLogs() - rLog := pLogs.ResourceLogs().AppendEmpty() - rAttrs := rLog.Resource().Attributes() + entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() + entityEvent := entityEvents.AppendEmpty() + entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID)) + entityState := entityEvent.SetEntityState() + m.correlateResourceAttributes( - md.ResourceMetrics().At(0).Resource().Attributes(), rAttrs, + md.ResourceMetrics().At(0).Resource().Attributes(), entityState.Attributes(), m.correlations.GetOrCreate(receiverID, endpointID), ) - rAttrs.PutStr(eventTypeAttr, metricMatch) - rAttrs.PutStr(receiverRuleAttr, rEntry.Rule) + // Remove the endpoint ID from the attributes as it's set in the entity ID. + entityState.Attributes().Remove(discovery.EndpointIDAttr) - logRecords := rLog.ScopeLogs().AppendEmpty().LogRecords() + entityState.Attributes().PutStr(eventTypeAttr, metricMatch) + entityState.Attributes().PutStr(receiverRuleAttr, rEntry.Rule) - logRecord := logRecords.AppendEmpty() desiredRecord := match.Record if desiredRecord == nil { desiredRecord = &LogRecord{} } - var desiredBody string + var desiredMsg string if desiredRecord.Body != "" { - desiredBody = desiredRecord.Body + desiredMsg = desiredRecord.Body } - logRecord.Body().SetStr(desiredBody) + entityState.Attributes().PutStr(discovery.MessageAttr, desiredMsg) for k, v := range desiredRecord.Attributes { - logRecord.Attributes().PutStr(k, v) + entityState.Attributes().PutStr(k, v) } - logRecord.Attributes().PutStr(metricNameAttr, metricName) - logRecord.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) + entityState.Attributes().PutStr(metricNameAttr, metricName) + entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) if ts := m.timestampFromMetric(metric); ts != nil { - logRecord.SetTimestamp(*ts) + entityEvent.SetTimestamp(*ts) } - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - return pLogs + + return entityEvents.ConvertAndMoveToLogs() } } } diff --git a/internal/receiver/discoveryreceiver/metric_evaluator_test.go b/internal/receiver/discoveryreceiver/metric_evaluator_test.go index a3acf928a1..affb47db43 100644 --- a/internal/receiver/discoveryreceiver/metric_evaluator_test.go +++ b/internal/receiver/discoveryreceiver/metric_evaluator_test.go @@ -110,15 +110,7 @@ func TestMetricEvaluation(t *testing.T) { require.Equal(t, 1, emitted.LogRecordCount()) rl := emitted.ResourceLogs().At(0) - rAttrs = rl.Resource().Attributes() - require.Equal(t, map[string]any{ - "discovery.endpoint.id": "endpoint.id", - "discovery.event.type": "metric.match", - "discovery.observer.id": "an_observer/observer.name", - "discovery.receiver.name": "receiver.name", - "discovery.receiver.rule": "a.rule", - "discovery.receiver.type": "a_receiver", - }, rAttrs.AsRaw()) + require.Equal(t, 0, rl.Resource().Attributes().Len()) sLogs := rl.ScopeLogs() require.Equal(t, 1, sLogs.Len()) @@ -129,13 +121,23 @@ func TestMetricEvaluation(t *testing.T) { lrAttrs := lr.Attributes() require.Equal(t, map[string]any{ - "discovery.status": string(status), - "metric.name": "desired.name", - "one": "one.value", - "two": "two.value", + discovery.OtelEntityIDAttr: map[string]any{ + "discovery.endpoint.id": "endpoint.id", + }, + discovery.OtelEntityEventTypeAttr: discovery.OtelEntityEventTypeState, + discovery.OtelEntityAttributesAttr: map[string]any{ + "discovery.event.type": "metric.match", + "discovery.observer.id": "an_observer/observer.name", + "discovery.receiver.name": "receiver.name", + "discovery.receiver.rule": "a.rule", + "discovery.receiver.type": "a_receiver", + "discovery.status": string(status), + "discovery.message": "desired body content", + "metric.name": "desired.name", + "one": "one.value", + "two": "two.value", + }, }, lrAttrs.AsRaw()) - - require.Equal(t, "desired body content", lr.Body().AsString()) }) } }) diff --git a/internal/receiver/discoveryreceiver/statement_evaluator.go b/internal/receiver/discoveryreceiver/statement_evaluator.go index eb3110d8fa..da097d4b69 100644 --- a/internal/receiver/discoveryreceiver/statement_evaluator.go +++ b/internal/receiver/discoveryreceiver/statement_evaluator.go @@ -17,9 +17,9 @@ package discoveryreceiver import ( "encoding/json" "fmt" - "time" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -147,8 +147,7 @@ func (se *statementEvaluator) Sync() error { func (se *statementEvaluator) evaluateStatement(statement *statussources.Statement) plog.Logs { se.logger.Debug("evaluating statement", zap.Any("statement", statement)) - statementLogRecord := statement.ToLogRecord() - receiverID, endpointID, rEntry, shouldEvaluate := se.receiverEntryFromLogRecord(statementLogRecord) + receiverID, endpointID, rEntry, shouldEvaluate := se.receiverEntryFromStatement(statement) if !shouldEvaluate { return plog.NewLogs() } @@ -184,35 +183,45 @@ func (se *statementEvaluator) evaluateStatement(statement *statussources.Stateme continue } - pLogs, logRecords := se.prepareMatchingLogs(rEntry, receiverID, endpointID) - logRecord := logRecords.AppendEmpty() + entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() + entityEvent := entityEvents.AppendEmpty() + entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID)) + entityState := entityEvent.SetEntityState() + _ = entityState.Attributes().FromRaw(statement.Fields) + entityState.Attributes().PutStr(discovery.MessageAttr, statement.Message) + + fromAttrs := pcommon.NewMap() + fromAttrs.PutStr(discovery.ReceiverTypeAttr, receiverID.Type().String()) + fromAttrs.PutStr(discovery.ReceiverNameAttr, receiverID.Name()) + se.correlateResourceAttributes(fromAttrs, entityState.Attributes(), se.correlations.GetOrCreate(receiverID, endpointID)) + entityState.Attributes().PutStr(eventTypeAttr, statementMatch) + entityState.Attributes().PutStr(receiverRuleAttr, rEntry.Rule) + var desiredRecord LogRecord if match.Record != nil { desiredRecord = *match.Record } - statementLogRecord.CopyTo(logRecord) if desiredRecord.Body != "" { body := desiredRecord.Body if desiredRecord.AppendPattern { body = fmt.Sprintf("%s (evaluated %q)", body, p) } - logRecord.Body().SetStr(body) + entityState.Attributes().PutStr(discovery.MessageAttr, body) } if len(desiredRecord.Attributes) > 0 { for k, v := range desiredRecord.Attributes { - logRecord.Attributes().PutStr(k, v) + entityState.Attributes().PutStr(k, v) } } - logRecord.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(statement.Time)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - return pLogs + entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) + entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(statement.Time)) + return entityEvents.ConvertAndMoveToLogs() } return plog.NewLogs() } -func (se *statementEvaluator) receiverEntryFromLogRecord(record plog.LogRecord) (component.ID, observer.EndpointID, ReceiverEntry, bool) { - receiverID, endpointID := statussources.ReceiverNameToIDs(record) +func (se *statementEvaluator) receiverEntryFromStatement(statement *statussources.Statement) (component.ID, observer.EndpointID, ReceiverEntry, bool) { + receiverID, endpointID := statussources.ReceiverNameToIDs(statement) if receiverID == discovery.NoType || endpointID == "" { // statement evaluation requires both a populated receiver.ID and EndpointID se.logger.Debug("unable to evaluate statement from receiver", zap.String("receiver", receiverID.String())) @@ -231,17 +240,3 @@ func (se *statementEvaluator) receiverEntryFromLogRecord(record plog.LogRecord) return receiverID, endpointID, rEntry, true } - -func (se *statementEvaluator) prepareMatchingLogs(rEntry ReceiverEntry, receiverID component.ID, endpointID observer.EndpointID) (plog.Logs, plog.LogRecordSlice) { - stagePLogs := plog.NewLogs() - rLog := stagePLogs.ResourceLogs().AppendEmpty() - rAttrs := rLog.Resource().Attributes() - fromAttrs := pcommon.NewMap() - fromAttrs.PutStr(discovery.ReceiverTypeAttr, receiverID.Type().String()) - fromAttrs.PutStr(discovery.ReceiverNameAttr, receiverID.Name()) - fromAttrs.PutStr(discovery.EndpointIDAttr, string(endpointID)) - se.correlateResourceAttributes(fromAttrs, rAttrs, se.correlations.GetOrCreate(receiverID, endpointID)) - rAttrs.PutStr(eventTypeAttr, statementMatch) - rAttrs.PutStr(receiverRuleAttr, rEntry.Rule) - return stagePLogs, rLog.ScopeLogs().AppendEmpty().LogRecords() -} diff --git a/internal/receiver/discoveryreceiver/statement_evaluator_test.go b/internal/receiver/discoveryreceiver/statement_evaluator_test.go index 8e457b21cb..0a4b8a979e 100644 --- a/internal/receiver/discoveryreceiver/statement_evaluator_test.go +++ b/internal/receiver/discoveryreceiver/statement_evaluator_test.go @@ -112,15 +112,7 @@ func TestStatementEvaluation(t *testing.T) { require.Equal(t, 1, emitted.ResourceLogs().Len()) rl := emitted.ResourceLogs().At(0) - rAttrs := rl.Resource().Attributes() - require.Equal(t, map[string]any{ - "discovery.endpoint.id": "endpoint.id", - "discovery.event.type": "statement.match", - "discovery.observer.id": "an_observer/observer.name", - "discovery.receiver.name": "receiver.name", - "discovery.receiver.rule": "a.rule", - "discovery.receiver.type": "a_receiver", - }, rAttrs.AsRaw()) + require.Equal(t, 0, rl.Resource().Attributes().Len()) sLogs := rl.ScopeLogs() require.Equal(t, 1, sLogs.Len()) @@ -129,34 +121,49 @@ func TestStatementEvaluation(t *testing.T) { require.Equal(t, 1, lrs.Len()) lr := sl.LogRecords().At(0) - lrAttrs := lr.Attributes().AsRaw() + oea, ok := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) + require.True(t, ok) + entityAttrs := oea.Map() - require.Contains(t, lrAttrs, "caller") + // Validate "caller" attribute + callerAttr, ok := entityAttrs.Get("caller") + require.True(t, ok) _, expectedFile, _, _ := runtime.Caller(0) // runtime doesn't use os.PathSeparator splitPath := strings.Split(expectedFile, "/") expectedCaller := splitPath[len(splitPath)-1] - require.Contains(t, lrAttrs["caller"], expectedCaller) - delete(lrAttrs, "caller") + require.Contains(t, callerAttr.Str(), expectedCaller) + entityAttrs.Remove("caller") - require.Equal(t, map[string]any{ - "discovery.status": string(status), - "name": `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`, - "attr.one": "attr.one.value", - "attr.two": "attr.two.value", - "field.one": "field.one.value", - "field_two": "field.two.value", - }, lrAttrs) - - expected := "desired body content" + // Validate the rest of the attributes + expectedMsg := "desired body content" if match.Record.AppendPattern { if match.Strict != "" { - expected = fmt.Sprintf("%s (evaluated \"desired.statement\")", expected) + expectedMsg = fmt.Sprintf("%s (evaluated \"desired.statement\")", expectedMsg) } else { - expected = fmt.Sprintf("%s (evaluated \"{\\\"field.one\\\":\\\"field.one.value\\\",\\\"field_two\\\":\\\"field.two.value\\\",\\\"message\\\":\\\"desired.statement\\\"}\")", expected) + expectedMsg = fmt.Sprintf("%s (evaluated \"{\\\"field.one\\\":\\\"field.one.value\\\",\\\"field_two\\\":\\\"field.two.value\\\",\\\"message\\\":\\\"desired.statement\\\"}\")", expectedMsg) } } - require.Equal(t, expected, lr.Body().AsString()) + require.Equal(t, map[string]any{ + discovery.OtelEntityIDAttr: map[string]any{ + "discovery.endpoint.id": "endpoint.id", + }, + discovery.OtelEntityEventTypeAttr: discovery.OtelEntityEventTypeState, + discovery.OtelEntityAttributesAttr: map[string]any{ + "discovery.event.type": "statement.match", + "discovery.observer.id": "an_observer/observer.name", + "discovery.receiver.name": "receiver.name", + "discovery.receiver.rule": "a.rule", + "discovery.receiver.type": "a_receiver", + "discovery.status": string(status), + "discovery.message": expectedMsg, + "name": `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`, + "attr.one": "attr.one.value", + "attr.two": "attr.two.value", + "field.one": "field.one.value", + "field_two": "field.two.value", + }, + }, lr.Attributes().AsRaw()) }) } }) diff --git a/internal/receiver/discoveryreceiver/statussources/statements.go b/internal/receiver/discoveryreceiver/statussources/statements.go index 18efeca708..8e2d4a74d5 100644 --- a/internal/receiver/discoveryreceiver/statussources/statements.go +++ b/internal/receiver/discoveryreceiver/statussources/statements.go @@ -23,8 +23,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "go.uber.org/zap/buffer" "go.uber.org/zap/zapcore" @@ -83,38 +81,25 @@ func StatementFromZapCoreEntry(encoder zapcore.Encoder, entry zapcore.Entry, fie return statement, nil } -func (s *Statement) ToLogRecord() plog.LogRecord { - logRecord := plog.NewLogRecord() - if s == nil { - return logRecord - } - - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(s.Time)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - logRecord.Body().SetStr(s.Message) - logRecord.Attributes().FromRaw(s.Fields) - return logRecord -} - // ReceiverNameToIDs parses the zap "name" field value according to // outcome of https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/12670 // where receiver creator receiver names are of the form // `//receiver_creator/{endpoint=""}/`. // If receiverName argument is not of this form empty Component and Endpoint IDs are returned. -func ReceiverNameToIDs(record plog.LogRecord) (receiverID component.ID, endpointID observer.EndpointID) { +func ReceiverNameToIDs(statement *Statement) (component.ID, observer.EndpointID) { // The receiver creator sets dynamically created receiver names as the zap "name" field for their component logger. - nameAttr, ok := record.Attributes().Get("name") + nameField, ok := statement.Fields["name"] if !ok { // there is nothing we can do without a receiver name return discovery.NoType, "" } - receiverName := nameAttr.AsString() + receiverName := fmt.Sprintf("%s", nameField) // The receiver creator will log an initial start statement not from the underlying receiver's logger. // These statements have an "endpoint_id" field and the "name" field won't include the necessary "receiver_creator/" // and "{endpoint=}" separators. In this case we get the EndpointID, if any, and form a placeholder name of desired form. - if endpointIDAttr, hasEndpointID := record.Attributes().Get("endpoint_id"); hasEndpointID { - receiverName = fmt.Sprintf(`%s/receiver_creator//{endpoint="PLACEHOLDER"}/%s`, receiverName, endpointIDAttr.AsString()) + if endpointID, hasEndpointID := statement.Fields["endpoint_id"]; hasEndpointID { + receiverName = fmt.Sprintf(`%s/receiver_creator//{endpoint="PLACEHOLDER"}/%s`, receiverName, endpointID) } // receiver creator generated and altered initial endpoint handler message names must contain diff --git a/internal/receiver/discoveryreceiver/statussources/statements_test.go b/internal/receiver/discoveryreceiver/statussources/statements_test.go index 41fe1cab4d..7de7df2ed6 100644 --- a/internal/receiver/discoveryreceiver/statussources/statements_test.go +++ b/internal/receiver/discoveryreceiver/statussources/statements_test.go @@ -21,7 +21,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" @@ -66,34 +65,6 @@ func TestStatementFromZapCoreEntryUnsupportedEncoder(t *testing.T) { require.Nil(t, statement) } -func TestStatementToLogRecord(t *testing.T) { - logger := zaptest.NewLogger(t).Named("logger.name") - encoder := NewZapCoreEncoder() - ce := logger.Check(zap.DebugLevel, "a.message") - entry := ce.Entry - - now := time.Now() - entry.Time = now - statement, err := StatementFromZapCoreEntry(encoder, entry, []zapcore.Field{ - zap.String("field.one", "field.value"), zap.String("field.two", "another.field.value"), - zap.Int("int", 1), zap.Bool("bool", true), - }) - require.NoError(t, err) - require.NotNil(t, statement) - - t0 := time.Now() - lr := statement.ToLogRecord() - - require.Equal(t, "a.message", lr.Body().AsString()) - require.Equal(t, now.UTC(), lr.Timestamp().AsTime()) - require.GreaterOrEqual(t, lr.ObservedTimestamp().AsTime(), t0) - require.Equal(t, map[string]any{ - "field.one": "field.value", "field.two": "another.field.value", - "logger": "logger.name", "bool": true, "int": float64(1), // becomes a float in json unmarshalling - }, lr.Attributes().AsRaw()) - -} - func TestReceiverNameToIDs(t *testing.T) { for _, test := range []struct { name string @@ -143,9 +114,8 @@ func TestReceiverNameToIDs(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - lr := plog.NewLogRecord() - lr.Attributes().PutStr("name", test.receiverName) - receiverID, endpointID := ReceiverNameToIDs(lr) + statement := &Statement{Fields: map[string]any{"name": test.receiverName}} + receiverID, endpointID := ReceiverNameToIDs(statement) require.Equal(t, test.expectedReceiverID, receiverID) require.Equal(t, test.expectedEndpointID, endpointID) }) @@ -167,9 +137,8 @@ func FuzzReceiverNameToIDs(f *testing.F) { } f.Fuzz(func(t *testing.T, receiverName string) { require.NotPanics(t, func() { - lr := plog.NewLogRecord() - lr.Attributes().PutStr("name", receiverName) - receiverID, endpointID := ReceiverNameToIDs(lr) + statement := &Statement{Fields: map[string]any{"name": receiverName}} + receiverID, endpointID := ReceiverNameToIDs(statement) // if we can't find a receiver we should never return an EndpointID if receiverID == discovery.NoType { require.Equal(t, observer.EndpointID(""), endpointID) diff --git a/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml b/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml index 3f06fc36fb..4b7455c244 100644 --- a/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml +++ b/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml @@ -1,7 +1,5 @@ extensions: host_observer: - host_observer/with_name: - host_observer/with/another/name: receivers: # set up otlp receiver to use its endpoints as assertion material @@ -40,19 +38,15 @@ receivers: watch_observers: - host_observer - - host_observer/with_name - - host_observer/with/another/name # drop scrape_timestamp attributes until we can accept arbitrary values processors: - attributes: - actions: - - action: delete - key: scrape_timestamp - - action: delete - key: observed_timestamp - - action: delete - key: timestamp + transform: + error_mode: ignore + log_statements: + - context: log + statements: + - delete_key(attributes["otel.entity.attributes"], "scrape_timestamp") exporters: otlp: @@ -66,13 +60,11 @@ service: level: debug extensions: - host_observer - - host_observer/with_name - - host_observer/with/another/name pipelines: metrics: receivers: [otlp] exporters: [otlp] logs: receivers: [discovery] - processors: [attributes] + processors: [transform] exporters: [otlp] diff --git a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml index 8f9d479038..3762405137 100644 --- a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml +++ b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml @@ -1,41 +1,44 @@ resource_logs: - - attributes: - discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 - discovery.event.type: metric.match - discovery.observer.id: host_observer - discovery.receiver.config: cmVjZWl2ZXJzOgogIHByb21ldGhldXNfc2ltcGxlOgogICAgY29uZmlnOiB7fQogICAgcmVzb3VyY2VfYXR0cmlidXRlczoKICAgICAgb25lLmtleTogb25lLnZhbHVlCiAgICAgIHR3by5rZXk6IHR3by52YWx1ZQogICAgcnVsZTogdHlwZSA9PSAiaG9zdHBvcnQiIGFuZCBjb21tYW5kIGNvbnRhaW5zICJvdGVsY29sIgp3YXRjaF9vYnNlcnZlcnM6Ci0gaG9zdF9vYnNlcnZlcgo= - discovery.receiver.rule: type == "hostport" and command contains "otelcol" - discovery.receiver.type: prometheus_simple - http.scheme: http - net.host.port: "8888" - one.key: one.value - service.instance.id: '[::]:8888' - service.name: prometheus_simple/[::]:8888 - two.key: two.value - service_instance_id: - service_name: otelcol - service_version: - scope_logs: + - scope_logs: - logs: - - body: Successfully connected to prometheus server - attributes: - discovery.status: successful - metric.name: otelcol_process_uptime - - - attributes: - discovery.endpoint.id: (host_observer)[::]-4318-TCP-1 - discovery.event.type: statement.match - discovery.observer.id: host_observer - discovery.receiver.config: cmVjZWl2ZXJzOgogIHByb21ldGhldXNfc2ltcGxlOgogICAgY29uZmlnOiB7fQogICAgcmVzb3VyY2VfYXR0cmlidXRlczoKICAgICAgb25lLmtleTogb25lLnZhbHVlCiAgICAgIHR3by5rZXk6IHR3by52YWx1ZQogICAgcnVsZTogdHlwZSA9PSAiaG9zdHBvcnQiIGFuZCBjb21tYW5kIGNvbnRhaW5zICJvdGVsY29sIgp3YXRjaF9vYnNlcnZlcnM6Ci0gaG9zdF9vYnNlcnZlcgo= - discovery.receiver.name: "" - discovery.receiver.rule: type == "hostport" and command contains "otelcol" - discovery.receiver.type: prometheus_simple - scope_logs: + - attributes: + otel.entity.id: + discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 + otel.entity.event.type: entity_state + otel.entity.attributes: + discovery.event.type: metric.match + discovery.observer.id: host_observer + discovery.receiver.config: cmVjZWl2ZXJzOgogIHByb21ldGhldXNfc2ltcGxlOgogICAgY29uZmlnOiB7fQogICAgcmVzb3VyY2VfYXR0cmlidXRlczoKICAgICAgb25lLmtleTogb25lLnZhbHVlCiAgICAgIHR3by5rZXk6IHR3by52YWx1ZQogICAgcnVsZTogdHlwZSA9PSAiaG9zdHBvcnQiIGFuZCBjb21tYW5kIGNvbnRhaW5zICJvdGVsY29sIgp3YXRjaF9vYnNlcnZlcnM6Ci0gaG9zdF9vYnNlcnZlcgo= + discovery.receiver.rule: type == "hostport" and command contains "otelcol" + discovery.receiver.type: prometheus_simple + http.scheme: http + net.host.port: "8888" + one.key: one.value + service.instance.id: '[::]:8888' + service.name: prometheus_simple/[::]:8888 + two.key: two.value + service_instance_id: + service_name: otelcol + service_version: + discovery.status: successful + discovery.message: Successfully connected to prometheus server + metric.name: otelcol_process_uptime + - scope_logs: - logs: - - body: (strict) Port appears to not be serving prometheus metrics - attributes: - caller: - discovery.status: failed - kind: receiver - name: prometheus_simple//receiver_creator/discovery{endpoint="[::]:4318"}/(host_observer)[::]-4318-TCP-1 - target_labels: '{__name__="up", instance="[::]:4318", job="prometheus_simple/[::]:4318"}' + - attributes: + otel.entity.id: + discovery.endpoint.id: (host_observer)[::]-4318-TCP-1 + otel.entity.event.type: entity_state + otel.entity.attributes: + discovery.event.type: statement.match + discovery.observer.id: host_observer + discovery.receiver.config: cmVjZWl2ZXJzOgogIHByb21ldGhldXNfc2ltcGxlOgogICAgY29uZmlnOiB7fQogICAgcmVzb3VyY2VfYXR0cmlidXRlczoKICAgICAgb25lLmtleTogb25lLnZhbHVlCiAgICAgIHR3by5rZXk6IHR3by52YWx1ZQogICAgcnVsZTogdHlwZSA9PSAiaG9zdHBvcnQiIGFuZCBjb21tYW5kIGNvbnRhaW5zICJvdGVsY29sIgp3YXRjaF9vYnNlcnZlcnM6Ci0gaG9zdF9vYnNlcnZlcgo= + discovery.receiver.name: "" + discovery.receiver.rule: type == "hostport" and command contains "otelcol" + discovery.receiver.type: prometheus_simple + caller: + discovery.status: failed + discovery.message: (strict) Port appears to not be serving prometheus metrics + kind: receiver + name: prometheus_simple//receiver_creator/discovery{endpoint="[::]:4318"}/(host_observer)[::]-4318-TCP-1 + target_labels: '{__name__="up", instance="[::]:4318", job="prometheus_simple/[::]:4318"}' From 214979898c7c4f47c016982718faaf88ba360761 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 10 Apr 2024 23:01:35 -0700 Subject: [PATCH 2/2] try to bring back the observers in the integration test --- .../testdata/host_observer_simple_prometheus_config.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml b/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml index 4b7455c244..fce7a27e89 100644 --- a/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml +++ b/tests/receivers/discovery/testdata/host_observer_simple_prometheus_config.yaml @@ -1,5 +1,7 @@ extensions: host_observer: + host_observer/with_name: + host_observer/with/another/name: receivers: # set up otlp receiver to use its endpoints as assertion material @@ -38,6 +40,8 @@ receivers: watch_observers: - host_observer + - host_observer/with_name + - host_observer/with/another/name # drop scrape_timestamp attributes until we can accept arbitrary values processors: @@ -60,6 +64,8 @@ service: level: debug extensions: - host_observer + - host_observer/with_name + - host_observer/with/another/name pipelines: metrics: receivers: [otlp]