Skip to content

Commit

Permalink
Add service name and version attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor committed Feb 6, 2024
1 parent 7a99baf commit ef6f148
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@
import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.stats.OpenTelemetryService;

public class PulsarBrokerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-broker";
private static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.broker";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
openTelemetryService = OpenTelemetryService.builder().clusterName(config.getClusterName()).build();
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getMeter(INSTRUMENTATION_SCOPE_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@
import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.stats.OpenTelemetryService;
import org.apache.pulsar.functions.worker.WorkerConfig;

public class PulsarWorkerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-function-worker";
private static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.function_worker";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
openTelemetryService =
OpenTelemetryService.builder().clusterName(workerConfig.getPulsarFunctionsCluster()).build();
openTelemetryService = OpenTelemetryService.builder()
.clusterName(workerConfig.getPulsarFunctionsCluster())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getMeter(INSTRUMENTATION_SCOPE_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.Builder;
import lombok.Singular;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -40,6 +41,8 @@
public class OpenTelemetryService implements Closeable {

public static final AttributeKey<String> CLUSTER_ATTRIBUTE = AttributeKey.stringKey("pulsar.cluster");
public static final AttributeKey<String> SERVICE_NAME_ATTRIBUTE = AttributeKey.stringKey("service.name");
public static final AttributeKey<String> SERVICE_VERSION_ATTRIBUTE = AttributeKey.stringKey("service.version");
public static final AttributeKey<Long> CONSUMER_ID_ATTRIBUTE = AttributeKey.longKey("pulsar.consumer.id");
public static final AttributeKey<String> CONSUMER_NAME_ATTRIBUTE = AttributeKey.stringKey("pulsar.consumer");
public static final AttributeKey<String> NAMESPACE_ATTRIBUTE = AttributeKey.stringKey("pulsar.namespace");
Expand All @@ -60,6 +63,8 @@ public class OpenTelemetryService implements Closeable {

@Builder
public OpenTelemetryService(String clusterName,
String serviceName,
String serviceVersion,
@Singular Map<String, String> extraProperties,
// Allows customizing the SDK builder; for testing purposes only.
@VisibleForTesting AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder) {
Expand All @@ -77,11 +82,21 @@ public OpenTelemetryService(String clusterName,

sdkBuilder.addResourceCustomizer(
(resource, __) -> {
if (resource.getAttribute(CLUSTER_ATTRIBUTE) != null) {
// Do not override if already set (via system properties or environment variables).
return resource;
var resourceBuilder = Resource.builder();
// Do not override attributes if already set (via system properties or environment variables).
if (resource.getAttribute(CLUSTER_ATTRIBUTE) == null) {
resourceBuilder.put(CLUSTER_ATTRIBUTE, clusterName);
}
return resource.merge(Resource.builder().put(CLUSTER_ATTRIBUTE, clusterName).build());
if (StringUtils.isNotEmpty(serviceName)
&& Objects.equals(Resource.getDefault().getAttribute(SERVICE_NAME_ATTRIBUTE),
resource.getAttribute(SERVICE_NAME_ATTRIBUTE))) {
resourceBuilder.put(SERVICE_NAME_ATTRIBUTE, serviceName);
}
if (StringUtils.isNotEmpty(serviceVersion)
&& resource.getAttribute(SERVICE_VERSION_ATTRIBUTE) == null) {
resourceBuilder.put(SERVICE_VERSION_ATTRIBUTE, serviceVersion);
}
return resource.merge(resourceBuilder.build());
});

openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ private boolean matchesDataAttributes(MetricData md) {
private boolean matchesAttributes(Attributes actual, Attributes expected) {
// Return true iff all attribute pairs in expected are a subset of those in actual. Allows tests to specify
// just the attributes they care about, instead of exhaustively having to list all of them.
if (true) {
for (var entry : expected.asMap().entrySet()) {
var key = entry.getKey();
var value = entry.getValue();
var actualValue = actual.get(key);
if (!value.equals(actualValue)) {
return false;
}
}
return true;
}
return expected.asMap().entrySet().stream().allMatch(e -> e.getValue().equals(actual.get(e.getKey())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,31 @@ public void testIsClusterNameSet() throws Exception {
assertTrue(metricData.stream().anyMatch(predicate));
}

@Test
public void testIsServiceNameAndVersionSet() throws Exception {
@Cleanup
var reader = InMemoryMetricReader.create();

@Cleanup
var ots = OpenTelemetryService.builder().
sdkBuilder(getSdkBuilder(reader)).
clusterName("testServiceNameAndVersion").
serviceName("openTelemetryServiceTestService").
serviceVersion("1.0.0").
extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false").
build();

var predicate = MetricDataMatcher.builder().
resourceAttribute(Attributes.of(
AttributeKey.stringKey("pulsar.cluster"), "testServiceNameAndVersion",
AttributeKey.stringKey("service.name"), "openTelemetryServiceTestService",
AttributeKey.stringKey("service.version"), "1.0.0")).
build();

var metricData = reader.collectAllMetrics();
assertTrue(metricData.stream().anyMatch(predicate));
}

@Test
public void testIsInstrumentationNameSetOnMeter() throws Exception {
Meter meter = openTelemetryService.getMeter("testInstrumentationScope");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@
import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.stats.OpenTelemetryService;
import org.apache.pulsar.proxy.server.ProxyConfiguration;

public class PulsarProxyOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-proxy";
private static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.proxy";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarProxyOpenTelemetry(ProxyConfiguration config) {
openTelemetryService = OpenTelemetryService.builder().clusterName(config.getClusterName()).build();
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getMeter(INSTRUMENTATION_SCOPE_NAME);
}

Expand Down
7 changes: 6 additions & 1 deletion tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-proxy</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger</artifactId>
Expand Down Expand Up @@ -176,7 +182,6 @@
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.functions.worker.stats.PulsarWorkerOpenTelemetry;
import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
import org.apache.pulsar.tests.integration.containers.ChaosContainer;
import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
Expand All @@ -38,10 +42,7 @@

public class OpenTelemetrySanityTest {

/*
* Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector.
* https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter
*/
// Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector.
@Test(timeOut = 360_000)
public void testOpenTelemetryMetricsOtlpExport() throws Exception {
var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
Expand All @@ -51,15 +52,9 @@ public void testOpenTelemetryMetricsOtlpExport() throws Exception {
var otlpEndpointProp =
Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint());

var brokerOtelServiceName = clusterName + "-broker";
var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, otlpEndpointProp);

var proxyOtelServiceName = clusterName + "-proxy";
var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, otlpEndpointProp);

var functionWorkerServiceNameSuffix = PulsarTestBase.randomName();
var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix;
var functionWorkerCollectorProps = getCollectorProps(functionWorkerOtelServiceName, exporter, otlpEndpointProp);
var brokerCollectorProps = getCollectorProps(exporter, otlpEndpointProp);
var proxyCollectorProps = getCollectorProps(exporter, otlpEndpointProp);
var functionWorkerCollectorProps = getCollectorProps(exporter, otlpEndpointProp);

var spec = PulsarClusterSpec.builder()
.clusterName(clusterName)
Expand All @@ -71,25 +66,25 @@ public void testOpenTelemetryMetricsOtlpExport() throws Exception {
@Cleanup("stop")
var pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1);

pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1);
// TODO: Validate cluster name is present once
// TODO: Validate cluster name and service version are present once
// https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved.
var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK.
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job", brokerOtelServiceName).isEmpty();
return !metrics.findByNameAndLabels(metricName, "job", PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job", proxyOtelServiceName).isEmpty();
return !metrics.findByNameAndLabels(metricName, "job", PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job", functionWorkerOtelServiceName).isEmpty();
return !metrics.findByNameAndLabels(metricName, "job", PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
});
}

Expand All @@ -107,16 +102,9 @@ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
var prometheusExporterPortProp =
Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", Integer.toString(prometheusExporterPort));

var brokerOtelServiceName = clusterName + "-broker";
var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, prometheusExporterPortProp);

var proxyOtelServiceName = clusterName + "-proxy";
var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, prometheusExporterPortProp);

var functionWorkerServiceNameSuffix = PulsarTestBase.randomName();
var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix;
var functionWorkerCollectorProps =
getCollectorProps(functionWorkerOtelServiceName, exporter, prometheusExporterPortProp);
var brokerCollectorProps = getCollectorProps(exporter, prometheusExporterPortProp);
var proxyCollectorProps = getCollectorProps(exporter, prometheusExporterPortProp);
var functionWorkerCollectorProps = getCollectorProps(exporter, prometheusExporterPortProp);

var spec = PulsarClusterSpec.builder()
.clusterName(clusterName)
Expand All @@ -130,28 +118,29 @@ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
@Cleanup("stop")
var pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();

pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1);
var workerContainer = pulsarCluster.getAnyWorker();
pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1);

var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK.
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyBroker(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", brokerOtelServiceName)).isEmpty();
Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", proxyOtelServiceName)).isEmpty();
Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(workerContainer, prometheusExporterPort);
var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", functionWorkerOtelServiceName)).isEmpty();
Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
});
}

Expand All @@ -160,12 +149,10 @@ private static PrometheusMetricsClient.Metrics getMetricsFromPrometheus(ChaosCon
return client.getMetrics();
}

private static Map<String, String> getCollectorProps(String serviceName, String exporter,
Pair<String, String> ... extraProps) {
private static Map<String, String> getCollectorProps(String exporter, Pair<String, String> ... extraProps) {
var defaultProps = Map.of(
"OTEL_SDK_DISABLED", "false",
"OTEL_METRIC_EXPORT_INTERVAL", "1000",
"OTEL_SERVICE_NAME", serviceName,
"OTEL_METRICS_EXPORTER", exporter
);
var props = new HashMap<>(defaultProps);
Expand Down

0 comments on commit ef6f148

Please sign in to comment.