diff --git a/README.md b/README.md index 0df9b79..e1bbe29 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ ![GitHub Actions Badge](https://github.com/vhatsura/confluent-kafka-extensions-diagnostics/actions/workflows/continuous.integration.yml/badge.svg) [![NuGet Badge](https://buildstats.info/nuget/Confluent.Kafka.Extensions.Diagnostics)](https://www.nuget.org/packages/Confluent.Kafka.Extensions.Diagnostics/) +The `Confluent.Kafka.Extensions.Diagnostics` package enables instrumentation of the `Confluent.Kafka` library +via [Activity API](https://docs.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs). + ## Installation ```powershell @@ -11,7 +14,65 @@ Install-Package Confluent.Kafka.Extensions.Diagnostics ## Usage +### Producer + +Producer instrumentation is done via wrapper class and, for this reason, the producer usage is not needed to be rewritten. However, +to enable producer instrumentation, `BuildWithInstrumentation` method should be called on the producer builder instead of `Build`. +After that, all produce calls (sync and async) will be instrumented. + ```csharp +using Confluent.Kafka; +using Confluent.Kafka.Extensions.Diagnostics; + + +using var producer = + new ProducerBuilder(new ProducerConfig(new ClientConfig { BootstrapServers = "localhost:9092" })) + .SetKeySerializer(Serializers.Null) + .SetValueSerializer(Serializers.Utf8) + .BuildWithInstrumentation(); + +await producer.ProduceAsync("topic", new Message { Value = "Hello World!" }); + ``` -## Roadmap +### Consumer + +Unfortunately, consumer interface of `Confluent.Kafka` library is not very flexible. Therefore, the instrumentation is implemented +via an extension method on the consumer itself. For this reason, the consumer usage should be rewritten as follows: + +```csharp +using Confluent.Kafka; +using Confluent.Kafka.Extensions.Diagnostics; + +using var consumer = new ConsumerBuilder( + new ConsumerConfig(new ClientConfig { BootstrapServers = "localhost:9092" }) + { + GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest + }) + .SetValueDeserializer(Deserializers.Utf8) + .Build(); + +consumer.Subscribe("topic"); + +try +{ + while (true) + { + try + { + consumer.ConsumeWithInstrumentation((result) => + { + Console.WriteLine(result.Message.Value); + }); + } + catch (ConsumeException e) + { + Console.WriteLine($"Error occured: {e.Error.Reason}"); + } + } +} +catch (OperationCanceledException) +{ + consumer.Close(); +} +``` diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/ActivityDiagnosticsHelper.cs b/src/Confluent.Kafka.Extensions.Diagnostics/ActivityDiagnosticsHelper.cs index 1ceac3b..dbff57b 100644 --- a/src/Confluent.Kafka.Extensions.Diagnostics/ActivityDiagnosticsHelper.cs +++ b/src/Confluent.Kafka.Extensions.Diagnostics/ActivityDiagnosticsHelper.cs @@ -1,4 +1,4 @@ -using System.Diagnostics; +using System.Diagnostics; using System.Text; namespace Confluent.Kafka.Extensions.Diagnostics; @@ -6,37 +6,25 @@ namespace Confluent.Kafka.Extensions.Diagnostics; internal static class ActivityDiagnosticsHelper { private const string ActivitySourceName = "Confluent.Kafka.Extensions.Diagnostics"; + private const string TraceParentHeaderName = "traceparent"; + private const string TraceStateHeaderName = "tracestate"; private static ActivitySource ActivitySource { get; } = new(ActivitySourceName); - internal static Activity? Start(TopicPartition partition, Message message) + internal static Activity? StartProduceActivity(TopicPartition partition, + Message message) { try { - Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Client, - default(ActivityContext), - new[] - { - new KeyValuePair("messaging.system", "kafka"), - new KeyValuePair("messaging.destination", partition.Topic), - new KeyValuePair("messaging.destination_kind", "topic"), - new KeyValuePair("messaging.kafka.partition", partition.Partition.ToString()) - }!); - - if (activity == null) return null; + Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Producer, + default(ActivityContext), ActivityTags(partition)!); + + if (activity == null) + return null; if (activity.IsAllDataRequested) { - if (message.Key != null) - { - activity.SetTag("messaging.kafka.message_key", message.Key.ToString()); - } - - if (message.Value != null) - { - int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()!); - activity.AddTag("messaging.message_payload_size_bytes", messagePayloadBytes.ToString()); - } + SetActivityTags(activity, message); } if (message.Headers == null) @@ -45,12 +33,12 @@ internal static class ActivityDiagnosticsHelper } if (activity.Id != null) - message.Headers.Add("traceparent", Encoding.UTF8.GetBytes(activity.Id)); + message.Headers.Add(TraceParentHeaderName, Encoding.UTF8.GetBytes(activity.Id)); var tracestateStr = activity.Context.TraceState; if (tracestateStr?.Length > 0) { - message.Headers.Add("tracestate", Encoding.UTF8.GetBytes(tracestateStr)); + message.Headers.Add(TraceStateHeaderName, Encoding.UTF8.GetBytes(tracestateStr)); } return activity; @@ -61,4 +49,60 @@ internal static class ActivityDiagnosticsHelper return null; } } + + internal static Activity? StartConsumeActivity(TopicPartition partition, + Message message) + { + var activity = ActivitySource.CreateActivity("Confluent.Kafka.Consume", ActivityKind.Consumer, + default(ActivityContext), ActivityTags(partition)!); + + if (activity != null) + { + var traceParentHeader = message.Headers?.FirstOrDefault(x => x.Key == TraceParentHeaderName); + var traceStateHeader = message.Headers?.FirstOrDefault(x => x.Key == TraceStateHeaderName); + + var traceParent = traceParentHeader != null + ? Encoding.UTF8.GetString(traceParentHeader.GetValueBytes()) + : null; + var traceState = traceStateHeader != null + ? Encoding.UTF8.GetString(traceStateHeader.GetValueBytes()) + : null; + + if (ActivityContext.TryParse(traceParent, traceState, out var activityContext)) + { + activity.SetParentId(activityContext.TraceId, activityContext.SpanId, activityContext.TraceFlags); + activity.TraceStateString = activityContext.TraceState; + } + + if (activity.IsAllDataRequested) + { + SetActivityTags(activity, message); + } + + activity.Start(); + } + + + return activity; + } + + private static void SetActivityTags(Activity activity, Message message) + { + if (message.Key != null) + { + activity.SetTag("messaging.kafka.message_key", message.Key.ToString()); + } + } + + private static IEnumerable> ActivityTags(TopicPartition partition) + { + return new[] + { + new KeyValuePair("messaging.system", "kafka"), + new KeyValuePair("messaging.destination", partition.Topic), + new KeyValuePair("messaging.destination_kind", "topic"), new KeyValuePair( + "messaging.kafka.partition", + partition.Partition.ToString()) + }; + } } diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/Confluent.Kafka.Extensions.Diagnostics.csproj b/src/Confluent.Kafka.Extensions.Diagnostics/Confluent.Kafka.Extensions.Diagnostics.csproj index dab1065..0e8f697 100644 --- a/src/Confluent.Kafka.Extensions.Diagnostics/Confluent.Kafka.Extensions.Diagnostics.csproj +++ b/src/Confluent.Kafka.Extensions.Diagnostics/Confluent.Kafka.Extensions.Diagnostics.csproj @@ -25,7 +25,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/ConsumerExtensions.cs b/src/Confluent.Kafka.Extensions.Diagnostics/ConsumerExtensions.cs new file mode 100644 index 0000000..52fa10c --- /dev/null +++ b/src/Confluent.Kafka.Extensions.Diagnostics/ConsumerExtensions.cs @@ -0,0 +1,47 @@ +namespace Confluent.Kafka.Extensions.Diagnostics; + +/// +/// Extension methods for . +/// +public static class ConsumerExtensions +{ + /// + /// Consumes a message from the topic with instrumentation. + /// + public static async Task ConsumeWithInstrumentation(this IConsumer consumer, + Func, CancellationToken, Task> action, CancellationToken cancellationToken) + { + var result = consumer.Consume(cancellationToken); + + var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message); + + try + { + await action(result, cancellationToken); + } + finally + { + activity?.Stop(); + } + } + + /// + /// Consumes a message from the topic with instrumentation. + /// + public static void ConsumeWithInstrumentation(this IConsumer consumer, + Action> action, int millisecondsTimeout) + { + var result = consumer.Consume(millisecondsTimeout); + + var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message); + + try + { + action(result); + } + finally + { + activity?.Stop(); + } + } +} diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs b/src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs index f05e8aa..32a94e6 100644 --- a/src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs +++ b/src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs @@ -25,10 +25,11 @@ public async Task> ProduceAsync( TopicPartition topicPartition, Message message, CancellationToken cancellationToken = new CancellationToken()) { - var activity = ActivityDiagnosticsHelper.Start(topicPartition, message); + var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message); try { + // todo: get delivery result and put it into the activity return await _producerImplementation.ProduceAsync(topicPartition, message, cancellationToken); } finally @@ -38,14 +39,14 @@ public async Task> ProduceAsync( } public void Produce( - string topic, Message message, Action> deliveryHandler = null) => + string topic, Message message, Action>? deliveryHandler = null) => Produce(new TopicPartition(topic, Partition.Any), message, deliveryHandler); public void Produce( TopicPartition topicPartition, Message message, - Action> deliveryHandler = null) + Action>? deliveryHandler = null) { - var activity = ActivityDiagnosticsHelper.Start(topicPartition, message); + var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message); try { diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/ProducerBuilderExtensions.cs b/src/Confluent.Kafka.Extensions.Diagnostics/ProducerBuilderExtensions.cs index 144b6cf..b8fd0f3 100644 --- a/src/Confluent.Kafka.Extensions.Diagnostics/ProducerBuilderExtensions.cs +++ b/src/Confluent.Kafka.Extensions.Diagnostics/ProducerBuilderExtensions.cs @@ -1,7 +1,13 @@ namespace Confluent.Kafka.Extensions.Diagnostics; +/// +/// Extension methods for . +/// public static class ProducerBuilderExtensions { + /// + /// Builds a new instrumented instance of producer. + /// public static IProducer BuildWithInstrumentation( this ProducerBuilder producerBuilder) {