diff --git a/internal/confmapprovider/discovery/discoverer.go b/internal/confmapprovider/discovery/discoverer.go index a4a4f8d4bbd..ed7b1c10960 100644 --- a/internal/confmapprovider/discovery/discoverer.go +++ b/internal/confmapprovider/discovery/discoverer.go @@ -359,7 +359,7 @@ func (d *discoverer) createObserver(observerID component.ID, cfg *Config) (otelc ce.Write(zap.String("config", fmt.Sprintf("%#v\n", observerConfig))) } - observerSettings := d.createExtensionCreateSettings(observerID.String()) + observerSettings := d.createExtensionCreateSettings(observerID) observer, err := observerFactory.CreateExtension(context.Background(), observerSettings, observerConfig) if err != nil { return nil, fmt.Errorf("failed creating %q extension: %w", observerID, err) @@ -481,10 +481,11 @@ func (d *discoverer) discoveryConfig(cfg *Config) (map[string]any, error) { return sMap, nil } -func (d *discoverer) createExtensionCreateSettings(kind string) otelcolextension.CreateSettings { +func (d *discoverer) createExtensionCreateSettings(observerID component.ID) otelcolextension.CreateSettings { return otelcolextension.CreateSettings{ + ID: observerID, TelemetrySettings: component.TelemetrySettings{ - Logger: zap.New(d.logger.Core()).With(zap.String("kind", kind)), + Logger: zap.New(d.logger.Core()).With(zap.String("kind", observerID.String())), TracerProvider: trace.NewNoopTracerProvider(), MeterProvider: metric.NewNoopMeterProvider(), MetricsLevel: configtelemetry.LevelDetailed, @@ -620,6 +621,11 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { } } + endpointID := "unavailable" + if eid, k := rAttrs.Get(discovery.EndpointIDAttr); k { + endpointID = eid.AsString() + } + var rule string var configSection map[string]any receiverID := component.NewIDWithName(component.Type(receiverType), receiverName) @@ -664,15 +670,18 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { if currentReceiverStatus != discovery.Successful || currentObserverStatus != discovery.Successful { if rStatusAttr, ok := lr.Attributes().Get(discovery.StatusAttr); ok { rStatus := discovery.StatusType(rStatusAttr.Str()) - if ok, err = discovery.IsValidStatus(rStatus); !ok { - d.logger.Debug("invalid status from log record", zap.Error(err), zap.Any("lr", lr.Body().AsRaw())) + if valid, e := discovery.IsValidStatus(rStatus); !valid { + d.logger.Debug("invalid status from log record", zap.Error(e), zap.Any("lr", lr.Body().AsRaw())) continue } receiverStatus := determineCurrentStatus(currentReceiverStatus, rStatus) - if receiverStatus == discovery.Partial { - fmt.Fprintf(os.Stderr, "Partially discovered %q using %q: %s\n", receiverID, observerID, lr.Body().AsString()) - } else if receiverStatus == discovery.Successful { - fmt.Fprintf(os.Stderr, "Successfully discovered %q using %q.\n", receiverID, observerID) + switch receiverStatus { + case discovery.Failed: + d.logger.Info(fmt.Sprintf("failed to discover %q using %q endpoint %q: %s", receiverID, observerID, endpointID, lr.Body().AsString())) + case discovery.Partial: + fmt.Fprintf(os.Stderr, "Partially discovered %q using %q endpoint %q: %s\n", receiverID, observerID, endpointID, lr.Body().AsString()) + case discovery.Successful: + fmt.Fprintf(os.Stderr, "Successfully discovered %q using %q endpoint %q.\n", receiverID, observerID, endpointID) } d.discoveredReceivers[receiverID] = receiverStatus d.discoveredObservers[observerID] = determineCurrentStatus(currentObserverStatus, rStatus) diff --git a/tests/general/discoverymode/docker_observer_discovery_test.go b/tests/general/discoverymode/docker_observer_discovery_test.go index 0d8da3b0ede..9aacd38a2d6 100644 --- a/tests/general/discoverymode/docker_observer_discovery_test.go +++ b/tests/general/discoverymode/docker_observer_discovery_test.go @@ -85,10 +85,11 @@ func TestDockerObserver(t *testing.T) { ) defer shutdown() - _, shutdownPrometheus := tc.Containers( + cntrs, shutdownPrometheus := tc.Containers( testutils.NewContainer().WithImage("bitnami/prometheus").WithLabel("test.id", tc.ID).WillWaitForLogs("Server is ready to receive web requests."), ) defer shutdownPrometheus() + prometheus := cntrs[0] expectedResourceMetrics := tc.ResourceMetrics("docker-observer-internal-prometheus.yaml") require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedResourceMetrics, 30*time.Second)) @@ -258,6 +259,12 @@ service: address: "" level: none `, stdout) - require.Contains(t, stderr, "Discovering for next 20s...\nSuccessfully discovered \"prometheus_simple\" using \"docker_observer\".\nDiscovery complete.\n") + require.Contains( + t, stderr, + fmt.Sprintf(`Discovering for next 20s... +Successfully discovered "prometheus_simple" using "docker_observer" endpoint "%s:9090". +Discovery complete. +`, prometheus.GetContainerID()), + ) require.Zero(t, sc) } diff --git a/tests/general/discoverymode/host_observer_discovery_test.go b/tests/general/discoverymode/host_observer_discovery_test.go index 685bc14d0e3..22ec4bb27eb 100644 --- a/tests/general/discoverymode/host_observer_discovery_test.go +++ b/tests/general/discoverymode/host_observer_discovery_test.go @@ -22,6 +22,7 @@ import ( "io" "path/filepath" "runtime" + "strings" "testing" "time" @@ -43,6 +44,9 @@ import ( // one and that the first collector process's initial and effective configs from the config server // are as expected. func TestHostObserver(t *testing.T) { + if testutils.CollectorImageIsForArm(t) { + t.Skip("host_observer missing process info on arm") + } tc := testutils.NewTestcase(t) defer tc.PrintLogsOnFailure() defer tc.ShutdownOTLPReceiverSink() @@ -105,6 +109,13 @@ func TestHostObserver(t *testing.T) { require.NoError(t, err) require.Zero(t, sc) + // get the pid of the collector for endpoint ID verification + sc, stdout, stderr := cc.Container.AssertExec( + t, 5*time.Second, "bash", "-c", "ps -C otelcol | tail -n 1 | grep -oE '^\\s*[0-9]+'", + ) + promPid := strings.TrimSpace(stdout) + require.Zero(t, sc, stderr) + expectedResourceMetrics := tc.ResourceMetrics("host-observer-internal-prometheus.yaml") require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedResourceMetrics, 30*time.Second)) @@ -225,7 +236,7 @@ func TestHostObserver(t *testing.T) { } require.Equal(t, expectedEffective, cc.EffectiveConfig(t, 55554)) - sc, stdout, stderr := cc.Container.AssertExec(t, 15*time.Second, + sc, stdout, stderr = cc.Container.AssertExec(t, 15*time.Second, "bash", "-c", `SPLUNK_DISCOVERY_LOG_LEVEL=error SPLUNK_DEBUG_CONFIG_SERVER=false \ REFRESH_INTERVAL=1s \ SPLUNK_DISCOVERY_DURATION=9s \ @@ -270,6 +281,11 @@ service: address: "" level: none `, stdout, fmt.Sprintf("unexpected --dry-run: %s", stderr)) - require.Contains(t, stderr, "Discovering for next 9s...\nSuccessfully discovered \"prometheus_simple\" using \"host_observer\".\nDiscovery complete.\n") + require.Contains( + t, stderr, + fmt.Sprintf(`Discovering for next 9s... +Successfully discovered "prometheus_simple" using "host_observer" endpoint "(host_observer)127.0.0.1-%d-TCP-%s". +Discovery complete. +`, promPort, promPid)) require.Zero(t, sc) } diff --git a/tests/general/discoverymode/k8s_observer_discovery_test.go b/tests/general/discoverymode/k8s_observer_discovery_test.go index 4e27aedba0c..3c559781d35 100644 --- a/tests/general/discoverymode/k8s_observer_discovery_test.go +++ b/tests/general/discoverymode/k8s_observer_discovery_test.go @@ -55,7 +55,7 @@ func TestK8sObserver(t *testing.T) { tc.Logger.Debug("applying ConfigMap", zap.String("stdout", sout.String()), zap.String("stderr", serr.String())) require.NoError(t, err) - redis := createRedis(cluster, "target.redis", namespace, serviceAccount) + redisName, redisUID := createRedis(cluster, "target.redis", namespace, serviceAccount) crManifest, crbManifest := clusterRoleAndBindingManifests(t, namespace, serviceAccount) sout, serr, err = cluster.Apply(crManifest) @@ -74,7 +74,7 @@ func TestK8sObserver(t *testing.T) { require.Eventually(t, func() bool { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - rPod, err := cluster.Clientset.CoreV1().Pods(namespace).Get(ctx, redis, metav1.GetOptions{}) + rPod, err := cluster.Clientset.CoreV1().Pods(namespace).Get(ctx, redisName, metav1.GetOptions{}) require.NoError(t, err) tc.Logger.Debug(fmt.Sprintf("redis is: %s\n", rPod.Status.Phase)) return rPod.Status.Phase == corev1.PodRunning @@ -146,10 +146,16 @@ service: address: "" level: none `, stdout.String()) - require.Contains(t, stderr.String(), "Discovering for next 10s...\nSuccessfully discovered \"smartagent\" using \"k8s_observer\".\nDiscovery complete.\n") + require.Contains( + t, stderr.String(), + fmt.Sprintf(`Discovering for next 10s... +Successfully discovered "smartagent" using "k8s_observer" endpoint "k8s_observer/%s/(6379)". +Discovery complete. +`, redisUID), + ) } -func createRedis(cluster *kubeutils.KindCluster, name, namespace, serviceAccount string) string { +func createRedis(cluster *kubeutils.KindCluster, name, namespace, serviceAccount string) (string, string) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() redis, err := cluster.Clientset.CoreV1().Pods(namespace).Create( @@ -178,7 +184,7 @@ func createRedis(cluster *kubeutils.KindCluster, name, namespace, serviceAccount }, metav1.CreateOptions{}, ) require.NoError(cluster.Testcase, err) - return redis.Name + return redis.Name, string(redis.UID) } func createNamespaceAndServiceAccount(cluster *kubeutils.KindCluster) (string, string) {