Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AppConfig and Event Hubs samples for using Monitor exporter #17565

Merged
merged 4 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sdk/monitor/microsoft-opentelemetry-exporter-azuremonitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@
<version>1.5.1</version> <!-- {x-version-update;com.azure:azure-core-test;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-appconfiguration</artifactId>
<version>1.1.7</version> <!-- {x-version-update;com.azure:azure-data-appconfiguration;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-tracing-opentelemetry</artifactId>
<version>1.0.0-beta.6</version> <!-- {x-version-update;com.azure:azure-core-tracing-opentelemetry;dependency} -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.opentelemetry.exporter.azuremonitor;

import com.azure.data.appconfiguration.ConfigurationClient;
import com.azure.data.appconfiguration.ConfigurationClientBuilder;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.TracerSdkProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;

/**
* Sample to demonstrate using {@link AzureMonitorExporter} to export telemetry events when setting a configuration
* in App Configuration through the {@link ConfigurationClient}.
*/
public class AppConfigurationAzureMonitorExporterSample {

private static final Tracer TRACER = configureAzureMonitorExporter();
private static final String CONNECTION_STRING = "<YOUR_CONNECTION_STRING>";

/**
* The main method to run the application.
* @param args Ignored args.
*/
public static void main(String[] args) {
doClientWork();
}

/**
* Configure the OpenTelemetry {@link AzureMonitorExporter} to enable tracing.
* @return The OpenTelemetry {@link Tracer} instance.
*/
private static Tracer configureAzureMonitorExporter() {
AzureMonitorExporter exporter = new AzureMonitorExporterBuilder()
.connectionString("{connection-string}")
.buildExporter();

TracerSdkProvider tracerSdkProvider = OpenTelemetrySdk.getTracerProvider();
tracerSdkProvider.addSpanProcessor(SimpleSpanProcessor.newBuilder(exporter).build());
return tracerSdkProvider.get("Sample");
}

/**
* Creates the {@link ConfigurationClient} and sets a configuration in Azure App Configuration with distributed
* tracing enabled and using the Azure Monitor exporter to export telemetry events to Azure Monitor.
*/
private static void doClientWork() {
ConfigurationClient client = new ConfigurationClientBuilder()
.connectionString(CONNECTION_STRING)
.buildClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
final Scope scope = TRACER.withSpan(span);
try {
// Thread bound (sync) calls will automatically pick up the parent span and you don't need to pass it explicitly.
client.setConfigurationSetting("hello", "text", "World");
} finally {
span.end();
scope.close();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.opentelemetry.exporter.azuremonitor;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.TracerSdkProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.OPERATION_TIMEOUT;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample to demontrate using {@link AzureMonitorExporter} to export telemetry events when sending events to Event Hubs
* using {@link EventHubProducerAsyncClient}.
*/
public class EventHubsAzureMonitorExporterSample {
private static final Tracer TRACER = configureAzureMonitorExporter();
private static final String CONNECTION_STRING = "<YOUR_CONNECTION_STRING>";

/**
* The main method to run the application.
* @param args Ignored args.
*/
public static void main(String[] args) {
doClientWork();
}

/**
* Configure the OpenTelemetry {@link AzureMonitorExporter} to enable tracing.
* @return The OpenTelemetry {@link Tracer} instance.
*/
private static Tracer configureAzureMonitorExporter() {
AzureMonitorExporter exporter = new AzureMonitorExporterBuilder()
.connectionString("{connection-string}")
.buildExporter();

TracerSdkProvider tracerSdkProvider = OpenTelemetrySdk.getTracerProvider();
tracerSdkProvider.addSpanProcessor(SimpleSpanProcessor.newBuilder(exporter).build());
return tracerSdkProvider.get("Sample");
}

/**
* Method that creates {@link EventHubProducerAsyncClient} to send events to Event Hubs with distributed
* telemetry enabled and using Azure Monitor exporter to export telemetry events.
*/
private static void doClientWork() {
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(CONNECTION_STRING)
.buildAsyncProducerClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
final Scope scope = TRACER.withSpan(span);
try {
String firstPartition = producer.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

final byte[] body = "EventData Sample 1".getBytes(UTF_8);
final byte[] body2 = "EventData Sample 2".getBytes(UTF_8);

// We will publish three events based on simple sentences.
Flux<EventData> data = Flux.just(
new EventData(body).addContext(PARENT_SPAN_KEY, TRACER.getCurrentSpan()),
new EventData(body2).addContext(PARENT_SPAN_KEY, TRACER.getCurrentSpan()));

// Create a batch to send the events.
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId(firstPartition)
.setMaximumSizeInBytes(256);

final AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block(OPERATION_TIMEOUT));

data.flatMap(event -> {
final EventDataBatch batch = currentBatch.get();
if (batch.tryAdd(event)) {
return Mono.empty();
}

// The batch is full, so we create a new batch and send the batch. Mono.when completes when both
// operations
// have completed.
return Mono.when(
producer.send(batch),
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);

// Add that event that we couldn't before.
if (!newBatch.tryAdd(event)) {
throw Exceptions.propagate(new IllegalArgumentException(String.format(
"Event is too large for an empty batch. Max size: %s. Event: %s",
newBatch.getMaxSizeInBytes(), event.getBodyAsString())));
}

return newBatch;
}));
}).then()
.doFinally(signal -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null) {
producer.send(batch).block(OPERATION_TIMEOUT);
}
})
.subscribe(unused -> System.out.println("Complete"),
error -> System.out.println("Error sending events: " + error),
() -> {
System.out.println("Completed sending events.");
span.end();
});


// The .subscribe() creation and assignment is not a blocking call. For the purpose of this example, we sleep
// the thread so the program does not end before the send operation is complete. Using .block() instead of
// .subscribe() will turn this into a synchronous call.
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignored) {
} finally {
// Disposing of our producer.
producer.close();
}
} finally {
scope.close();
}
}
}