Skip to content

Commit

Permalink
log endpoint id with discovery status (open-telemetry#2964)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmfitzpatrick authored Apr 14, 2023
1 parent c6ae5cd commit 7b7cb53
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 18 deletions.
27 changes: 18 additions & 9 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (d *discoverer) createObserver(observerID component.ID, cfg *Config) (otelc
ce.Write(zap.String("config", fmt.Sprintf("%#v\n", observerConfig)))
}

observerSettings := d.createExtensionCreateSettings(observerID.String())
observerSettings := d.createExtensionCreateSettings(observerID)
observer, err := observerFactory.CreateExtension(context.Background(), observerSettings, observerConfig)
if err != nil {
return nil, fmt.Errorf("failed creating %q extension: %w", observerID, err)
Expand Down Expand Up @@ -481,10 +481,11 @@ func (d *discoverer) discoveryConfig(cfg *Config) (map[string]any, error) {
return sMap, nil
}

func (d *discoverer) createExtensionCreateSettings(kind string) otelcolextension.CreateSettings {
func (d *discoverer) createExtensionCreateSettings(observerID component.ID) otelcolextension.CreateSettings {
return otelcolextension.CreateSettings{
ID: observerID,
TelemetrySettings: component.TelemetrySettings{
Logger: zap.New(d.logger.Core()).With(zap.String("kind", kind)),
Logger: zap.New(d.logger.Core()).With(zap.String("kind", observerID.String())),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: configtelemetry.LevelDetailed,
Expand Down Expand Up @@ -620,6 +621,11 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
}
}

endpointID := "unavailable"
if eid, k := rAttrs.Get(discovery.EndpointIDAttr); k {
endpointID = eid.AsString()
}

var rule string
var configSection map[string]any
receiverID := component.NewIDWithName(component.Type(receiverType), receiverName)
Expand Down Expand Up @@ -664,15 +670,18 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
if currentReceiverStatus != discovery.Successful || currentObserverStatus != discovery.Successful {
if rStatusAttr, ok := lr.Attributes().Get(discovery.StatusAttr); ok {
rStatus := discovery.StatusType(rStatusAttr.Str())
if ok, err = discovery.IsValidStatus(rStatus); !ok {
d.logger.Debug("invalid status from log record", zap.Error(err), zap.Any("lr", lr.Body().AsRaw()))
if valid, e := discovery.IsValidStatus(rStatus); !valid {
d.logger.Debug("invalid status from log record", zap.Error(e), zap.Any("lr", lr.Body().AsRaw()))
continue
}
receiverStatus := determineCurrentStatus(currentReceiverStatus, rStatus)
if receiverStatus == discovery.Partial {
fmt.Fprintf(os.Stderr, "Partially discovered %q using %q: %s\n", receiverID, observerID, lr.Body().AsString())
} else if receiverStatus == discovery.Successful {
fmt.Fprintf(os.Stderr, "Successfully discovered %q using %q.\n", receiverID, observerID)
switch receiverStatus {
case discovery.Failed:
d.logger.Info(fmt.Sprintf("failed to discover %q using %q endpoint %q: %s", receiverID, observerID, endpointID, lr.Body().AsString()))
case discovery.Partial:
fmt.Fprintf(os.Stderr, "Partially discovered %q using %q endpoint %q: %s\n", receiverID, observerID, endpointID, lr.Body().AsString())
case discovery.Successful:
fmt.Fprintf(os.Stderr, "Successfully discovered %q using %q endpoint %q.\n", receiverID, observerID, endpointID)
}
d.discoveredReceivers[receiverID] = receiverStatus
d.discoveredObservers[observerID] = determineCurrentStatus(currentObserverStatus, rStatus)
Expand Down
11 changes: 9 additions & 2 deletions tests/general/discoverymode/docker_observer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func TestDockerObserver(t *testing.T) {
)
defer shutdown()

_, shutdownPrometheus := tc.Containers(
cntrs, shutdownPrometheus := tc.Containers(
testutils.NewContainer().WithImage("bitnami/prometheus").WithLabel("test.id", tc.ID).WillWaitForLogs("Server is ready to receive web requests."),
)
defer shutdownPrometheus()
prometheus := cntrs[0]

expectedResourceMetrics := tc.ResourceMetrics("docker-observer-internal-prometheus.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedResourceMetrics, 30*time.Second))
Expand Down Expand Up @@ -258,6 +259,12 @@ service:
address: ""
level: none
`, stdout)
require.Contains(t, stderr, "Discovering for next 20s...\nSuccessfully discovered \"prometheus_simple\" using \"docker_observer\".\nDiscovery complete.\n")
require.Contains(
t, stderr,
fmt.Sprintf(`Discovering for next 20s...
Successfully discovered "prometheus_simple" using "docker_observer" endpoint "%s:9090".
Discovery complete.
`, prometheus.GetContainerID()),
)
require.Zero(t, sc)
}
20 changes: 18 additions & 2 deletions tests/general/discoverymode/host_observer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

Expand All @@ -43,6 +44,9 @@ import (
// one and that the first collector process's initial and effective configs from the config server
// are as expected.
func TestHostObserver(t *testing.T) {
if testutils.CollectorImageIsForArm(t) {
t.Skip("host_observer missing process info on arm")
}
tc := testutils.NewTestcase(t)
defer tc.PrintLogsOnFailure()
defer tc.ShutdownOTLPReceiverSink()
Expand Down Expand Up @@ -105,6 +109,13 @@ func TestHostObserver(t *testing.T) {
require.NoError(t, err)
require.Zero(t, sc)

// get the pid of the collector for endpoint ID verification
sc, stdout, stderr := cc.Container.AssertExec(
t, 5*time.Second, "bash", "-c", "ps -C otelcol | tail -n 1 | grep -oE '^\\s*[0-9]+'",
)
promPid := strings.TrimSpace(stdout)
require.Zero(t, sc, stderr)

expectedResourceMetrics := tc.ResourceMetrics("host-observer-internal-prometheus.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedResourceMetrics, 30*time.Second))

Expand Down Expand Up @@ -225,7 +236,7 @@ func TestHostObserver(t *testing.T) {
}
require.Equal(t, expectedEffective, cc.EffectiveConfig(t, 55554))

sc, stdout, stderr := cc.Container.AssertExec(t, 15*time.Second,
sc, stdout, stderr = cc.Container.AssertExec(t, 15*time.Second,
"bash", "-c", `SPLUNK_DISCOVERY_LOG_LEVEL=error SPLUNK_DEBUG_CONFIG_SERVER=false \
REFRESH_INTERVAL=1s \
SPLUNK_DISCOVERY_DURATION=9s \
Expand Down Expand Up @@ -270,6 +281,11 @@ service:
address: ""
level: none
`, stdout, fmt.Sprintf("unexpected --dry-run: %s", stderr))
require.Contains(t, stderr, "Discovering for next 9s...\nSuccessfully discovered \"prometheus_simple\" using \"host_observer\".\nDiscovery complete.\n")
require.Contains(
t, stderr,
fmt.Sprintf(`Discovering for next 9s...
Successfully discovered "prometheus_simple" using "host_observer" endpoint "(host_observer)127.0.0.1-%d-TCP-%s".
Discovery complete.
`, promPort, promPid))
require.Zero(t, sc)
}
16 changes: 11 additions & 5 deletions tests/general/discoverymode/k8s_observer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestK8sObserver(t *testing.T) {
tc.Logger.Debug("applying ConfigMap", zap.String("stdout", sout.String()), zap.String("stderr", serr.String()))
require.NoError(t, err)

redis := createRedis(cluster, "target.redis", namespace, serviceAccount)
redisName, redisUID := createRedis(cluster, "target.redis", namespace, serviceAccount)

crManifest, crbManifest := clusterRoleAndBindingManifests(t, namespace, serviceAccount)
sout, serr, err = cluster.Apply(crManifest)
Expand All @@ -74,7 +74,7 @@ func TestK8sObserver(t *testing.T) {
require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rPod, err := cluster.Clientset.CoreV1().Pods(namespace).Get(ctx, redis, metav1.GetOptions{})
rPod, err := cluster.Clientset.CoreV1().Pods(namespace).Get(ctx, redisName, metav1.GetOptions{})
require.NoError(t, err)
tc.Logger.Debug(fmt.Sprintf("redis is: %s\n", rPod.Status.Phase))
return rPod.Status.Phase == corev1.PodRunning
Expand Down Expand Up @@ -146,10 +146,16 @@ service:
address: ""
level: none
`, stdout.String())
require.Contains(t, stderr.String(), "Discovering for next 10s...\nSuccessfully discovered \"smartagent\" using \"k8s_observer\".\nDiscovery complete.\n")
require.Contains(
t, stderr.String(),
fmt.Sprintf(`Discovering for next 10s...
Successfully discovered "smartagent" using "k8s_observer" endpoint "k8s_observer/%s/(6379)".
Discovery complete.
`, redisUID),
)
}

func createRedis(cluster *kubeutils.KindCluster, name, namespace, serviceAccount string) string {
func createRedis(cluster *kubeutils.KindCluster, name, namespace, serviceAccount string) (string, string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
redis, err := cluster.Clientset.CoreV1().Pods(namespace).Create(
Expand Down Expand Up @@ -178,7 +184,7 @@ func createRedis(cluster *kubeutils.KindCluster, name, namespace, serviceAccount
}, metav1.CreateOptions{},
)
require.NoError(cluster.Testcase, err)
return redis.Name
return redis.Name, string(redis.UID)
}

func createNamespaceAndServiceAccount(cluster *kubeutils.KindCluster) (string, string) {
Expand Down

0 comments on commit 7b7cb53

Please sign in to comment.