From 503e0b165422d0af31ca3d65a9582443cf4187c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Kie=C5=82kowicz?= Date: Wed, 30 Oct 2024 17:21:19 +0100 Subject: [PATCH] [repo/Kafka] Prepare to .NET9 (#2275) --- .../InstrumentedConsumer.cs | 24 ++++----- .../InstrumentedConsumerBuilder.cs | 8 +-- .../InstrumentedProducer.cs | 34 ++++++------- .../OpenTelemetryConsumeResultExtensions.cs | 26 ++-------- .../OpenTelemetryConsumerBuilderExtensions.cs | 2 +- .../OpenTelemetryProducerBuilderExtensions.cs | 2 +- .../HostedMeteringTests.cs | 8 +-- .../HostedTracingAndMeteringTests.cs | 16 +++--- .../HostedTracingTests.cs | 8 +-- .../KafkaHelpers.cs | 6 +-- .../MeteringTests.cs | 18 +++---- ...TelemetryConsumerBuilderExtensionsTests.cs | 42 ++++++++++++---- ...TelemetryProducerBuilderExtensionsTests.cs | 8 +-- .../TracingTests.cs | 50 +++++++++---------- 14 files changed, 129 insertions(+), 123 deletions(-) diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs index 561f3a1014..6fe22b0f19 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs @@ -50,7 +50,7 @@ public void SetSaslCredentials(string username, string password) public ConsumeResult? Consume(int millisecondsTimeout) { - DateTimeOffset start = DateTimeOffset.UtcNow; + var start = DateTimeOffset.UtcNow; ConsumeResult? result = null; ConsumeResult consumeResult = default; string? errorType = null; @@ -67,7 +67,7 @@ public void SetSaslCredentials(string username, string password) } finally { - DateTimeOffset end = DateTimeOffset.UtcNow; + var end = DateTimeOffset.UtcNow; if (result is { IsPartitionEOF: false }) { this.InstrumentConsumption(start, end, consumeResult, errorType); @@ -77,7 +77,7 @@ public void SetSaslCredentials(string username, string password) public ConsumeResult? Consume(CancellationToken cancellationToken = default) { - DateTimeOffset start = DateTimeOffset.UtcNow; + var start = DateTimeOffset.UtcNow; ConsumeResult? result = null; ConsumeResult consumeResult = default; string? errorType = null; @@ -94,7 +94,7 @@ public void SetSaslCredentials(string username, string password) } finally { - DateTimeOffset end = DateTimeOffset.UtcNow; + var end = DateTimeOffset.UtcNow; if (result is { IsPartitionEOF: false }) { this.InstrumentConsumption(start, end, consumeResult, errorType); @@ -104,7 +104,7 @@ public void SetSaslCredentials(string username, string password) public ConsumeResult? Consume(TimeSpan timeout) { - DateTimeOffset start = DateTimeOffset.UtcNow; + var start = DateTimeOffset.UtcNow; ConsumeResult? result = null; ConsumeResult consumeResult = default; string? errorType = null; @@ -121,7 +121,7 @@ public void SetSaslCredentials(string username, string password) } finally { - DateTimeOffset end = DateTimeOffset.UtcNow; + var end = DateTimeOffset.UtcNow; if (result is { IsPartitionEOF: false }) { this.InstrumentConsumption(start, end, consumeResult, errorType); @@ -320,11 +320,11 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT { if (this.options.Traces) { - PropagationContext propagationContext = consumeResult.Headers != null + var propagationContext = consumeResult.Headers != null ? OpenTelemetryConsumeResultExtensions.ExtractPropagationContext(consumeResult.Headers) : default; - using Activity? activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); + using var activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); if (activity != null) { if (errorType != null) @@ -342,7 +342,7 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT if (this.options.Metrics) { - TimeSpan duration = endTime - startTime; + var duration = endTime - startTime; RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType); } } @@ -354,10 +354,10 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT : string.Concat(topicPartitionOffset!.Topic, " ", ConfluentKafkaCommon.ReceiveOperationName); ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid() - ? new[] { new ActivityLink(propagationContext.ActivityContext) } - : Array.Empty(); + ? [new ActivityLink(propagationContext.ActivityContext)] + : []; - Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default); + var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default); if (activity?.IsAllDataRequested == true) { activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem); diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs index 8affdfa5fd..82f29e3391 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs @@ -41,10 +41,12 @@ internal bool EnableTraces /// an . public override IConsumer Build() { - ConsumerConfig config = (ConsumerConfig)this.Config; + var config = (ConsumerConfig)this.Config; - var consumer = new InstrumentedConsumer(base.Build(), this.options); - consumer.GroupId = config.GroupId; + var consumer = new InstrumentedConsumer(base.Build(), this.options) + { + GroupId = config.GroupId, + }; return consumer; } diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs index b47caa3591..657fbfd629 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs @@ -42,8 +42,8 @@ public async Task> ProduceAsync( Message message, CancellationToken cancellationToken = default) { - DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartPublishActivity(start, topic, message); + var start = DateTimeOffset.UtcNow; + using var activity = this.StartPublishActivity(start, topic, message); if (activity != null) { this.InjectActivity(activity, message); @@ -71,9 +71,9 @@ public async Task> ProduceAsync( } finally { - DateTimeOffset end = DateTimeOffset.UtcNow; + var end = DateTimeOffset.UtcNow; activity?.SetEndTime(end.UtcDateTime); - TimeSpan duration = end - start; + var duration = end - start; if (this.options.Metrics) { @@ -89,8 +89,8 @@ public async Task> ProduceAsync( Message message, CancellationToken cancellationToken = default) { - DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); + var start = DateTimeOffset.UtcNow; + using var activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); if (activity != null) { this.InjectActivity(activity, message); @@ -118,9 +118,9 @@ public async Task> ProduceAsync( } finally { - DateTimeOffset end = DateTimeOffset.UtcNow; + var end = DateTimeOffset.UtcNow; activity?.SetEndTime(end.UtcDateTime); - TimeSpan duration = end - start; + var duration = end - start; if (this.options.Metrics) { @@ -133,8 +133,8 @@ public async Task> ProduceAsync( public void Produce(string topic, Message message, Action>? deliveryHandler = null) { - DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartPublishActivity(start, topic, message); + var start = DateTimeOffset.UtcNow; + using var activity = this.StartPublishActivity(start, topic, message); if (activity != null) { this.InjectActivity(activity, message); @@ -161,9 +161,9 @@ public void Produce(string topic, Message message, Action message, Action message, Action>? deliveryHandler = null) { - DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); + var start = DateTimeOffset.UtcNow; + using var activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); if (activity != null) { this.InjectActivity(activity, message); @@ -202,9 +202,9 @@ public void Produce(TopicPartition topicPartition, Message message } finally { - DateTimeOffset end = DateTimeOffset.UtcNow; + var end = DateTimeOffset.UtcNow; activity?.SetEndTime(end.UtcDateTime); - TimeSpan duration = end - start; + var duration = end - start; if (this.options.Metrics) { @@ -364,7 +364,7 @@ private void InjectActivity(Activity? activity, Message message) private void InjectTraceContext(Message message, string key, string value) { - message.Headers ??= new Headers(); + message.Headers ??= []; message.Headers.Add(key, Encoding.UTF8.GetBytes(value)); } } diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumeResultExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumeResultExtensions.cs index 5ef64a43ac..5e302b50c6 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumeResultExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumeResultExtensions.cs @@ -105,25 +105,7 @@ public static bool TryExtractPropagationContext( return consumeResult; } - Activity? processActivity = null; - if (TryExtractPropagationContext(consumeResult, out var propagationContext)) - { - processActivity = StartProcessActivity( - propagationContext, - consumeResult.TopicPartitionOffset, - consumeResult.Message.Key, - instrumentedConsumer.Name, - instrumentedConsumer.GroupId!); - } - else - { - processActivity = StartProcessActivity( - default, - consumeResult.TopicPartitionOffset, - consumeResult.Message.Key, - instrumentedConsumer.Name, - instrumentedConsumer.GroupId!); - } + var processActivity = StartProcessActivity(TryExtractPropagationContext(consumeResult, out var propagationContext) ? propagationContext : default, consumeResult.TopicPartitionOffset, consumeResult.Message.Key, instrumentedConsumer.Name, instrumentedConsumer.GroupId!); try { @@ -152,10 +134,10 @@ internal static PropagationContext ExtractPropagationContext(Headers? headers) : string.Concat(topicPartitionOffset!.Topic, " ", ConfluentKafkaCommon.ProcessOperationName); ActivityLink[] activityLinks = propagationContext != default && propagationContext.ActivityContext.IsValid() - ? new[] { new ActivityLink(propagationContext.ActivityContext) } - : Array.Empty(); + ? [new ActivityLink(propagationContext.ActivityContext)] + : []; - Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, parentContext: default); + var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, parentContext: default); if (activity?.IsAllDataRequested == true) { activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem); diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs index 0e2762a779..e26de22b62 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs @@ -22,7 +22,7 @@ public static class OpenTelemetryConsumerBuilderExtensions #endif public static InstrumentedConsumerBuilder AsInstrumentedConsumerBuilder(this ConsumerBuilder consumerBuilder) { - InstrumentedConsumerBuilder result = new InstrumentedConsumerBuilder(consumerBuilder.GetInternalConfig() ?? Enumerable.Empty>()); + var result = new InstrumentedConsumerBuilder(consumerBuilder.GetInternalConfig() ?? []); result.SetInternalErrorHandler(consumerBuilder.GetInternalErrorHandler()); result.SetInternalLogHandler(consumerBuilder.GetInternalLogHandler()); result.SetInternalStatisticsHandler(consumerBuilder.GetInternalStatisticsHandler()); diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs index 071736f509..f2610c63ed 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs @@ -22,7 +22,7 @@ public static class OpenTelemetryProducerBuilderExtensions #endif public static InstrumentedProducerBuilder AsInstrumentedProducerBuilder(this ProducerBuilder producerBuilder) { - InstrumentedProducerBuilder instrumentedProducerBuilder = new InstrumentedProducerBuilder(producerBuilder.GetInternalConfig() ?? Enumerable.Empty>()); + var instrumentedProducerBuilder = new InstrumentedProducerBuilder(producerBuilder.GetInternalConfig() ?? []); instrumentedProducerBuilder.SetInternalLogHandler(producerBuilder.GetInternalLogHandler()); instrumentedProducerBuilder.SetInternalErrorHandler(producerBuilder.GetInternalErrorHandler()); instrumentedProducerBuilder.SetInternalStatisticsHandler(producerBuilder.GetInternalStatisticsHandler()); diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs index f9f188f343..228ef108eb 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs @@ -18,7 +18,7 @@ public class HostedMeteringTests(ITestOutputHelper outputHelper) [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() { - List metrics = new(); + List metrics = []; var builder = Host.CreateDefaultBuilder(); builder.ConfigureServices(services => { @@ -50,10 +50,10 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() { await host.StartAsync(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (var producer = host.Services.GetRequiredService>().Build()) { - for (int i = 0; i < 100; i++) + for (var i = 0; i < 100; i++) { producer.Produce(topic, new Message() { @@ -70,7 +70,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() { consumer.Subscribe(topic); - int j = 0; + var j = 0; while (true) { var consumerResult = consumer.Consume(); diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs index c49f03deb1..8d4685c707 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs @@ -84,10 +84,10 @@ public class HostedTracingAndMeteringTests(ITestOutputHelper outputHelper) [InlineData(false, false, false, false, false, false)] public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool enableProducerMetrics, bool enableProducerTraces, bool useNamedProducerInstrumentation, bool enableConsumerMetrics, bool enableConsumerTraces, bool useNamedConsumerInstrumentation) { - string? producerInstrumentationName = useNamedProducerInstrumentation ? "MyProducer" : null; - string? consumerInstrumentationName = useNamedConsumerInstrumentation ? "MyConsumer" : null; - List metrics = new(); - List activities = new(); + var producerInstrumentationName = useNamedProducerInstrumentation ? "MyProducer" : null; + var consumerInstrumentationName = useNamedConsumerInstrumentation ? "MyConsumer" : null; + List metrics = []; + List activities = []; var builder = Host.CreateDefaultBuilder(); builder.ConfigureServices(services => { @@ -176,12 +176,12 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool en Assert.Equal(enableConsumerMetrics, consumerBuilder.EnableMetrics); Assert.Equal(enableConsumerTraces, consumerBuilder.EnableTraces); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (var producer = (useNamedProducerInstrumentation ? host.Services.GetRequiredKeyedService>(producerInstrumentationName) : host.Services.GetRequiredService>()).Build()) { - for (int i = 0; i < 100; i++) + for (var i = 0; i < 100; i++) { producer.Produce(topic, new Message() { @@ -207,7 +207,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool en { consumer.Subscribe(topic); - int j = 0; + var j = 0; while (true) { var consumerResult = consumer.Consume(); @@ -238,7 +238,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool en host.Services.GetRequiredService().EnsureMetricsAreFlushed(); } - IGrouping[] groups = metrics.GroupBy(x => x.Name).ToArray(); + var groups = metrics.GroupBy(x => x.Name).ToArray(); if (enableProducerMetrics) { diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs index d47d93d14f..07bae00bf3 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs @@ -19,7 +19,7 @@ public class HostedTracingTests(ITestOutputHelper outputHelper) [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() { - List activities = new(); + List activities = []; var builder = Host.CreateDefaultBuilder(); builder.ConfigureServices(services => { @@ -51,10 +51,10 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() { await host.StartAsync(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (var producer = host.Services.GetRequiredService>().Build()) { - for (int i = 0; i < 100; i++) + for (var i = 0; i < 100; i++) { producer.Produce(topic, new Message() { Key = $"any_key_{i}", Value = $"any_value_{i}", }); outputHelper.WriteLine("produced message {0}", i); @@ -67,7 +67,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() { consumer.Subscribe(topic); - int j = 0; + var j = 0; while (true) { var consumerResult = consumer.Consume(); diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs index 43929b6a31..df8c3d1c38 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs @@ -14,13 +14,13 @@ internal static class KafkaHelpers public static async Task ProduceTestMessageAsync() { - string topic = $"otel-topic-{Guid.NewGuid()}"; - ProducerConfig producerConfig = new ProducerConfig + var topic = $"otel-topic-{Guid.NewGuid()}"; + var producerConfig = new ProducerConfig { BootstrapServers = KafkaEndPoint, }; ProducerBuilder producerBuilder = new(producerConfig); - IProducer producer = producerBuilder.Build(); + var producer = producerBuilder.Build(); await producer.ProduceAsync(topic, new Message { Value = "any_value", diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs index eff25507ff..eee6235952 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs @@ -23,19 +23,19 @@ To use Docker... [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicProduceToTopicTest() { - ProducerConfig producerConfig = new ProducerConfig + var producerConfig = new ProducerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, }; InstrumentedProducerBuilder producerBuilder = new(producerConfig); var metrics = new List(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (var meterProvider = Sdk.CreateMeterProviderBuilder() .AddInMemoryExporter(metrics) .AddKafkaProducerInstrumentation(producerBuilder) .Build()) { - IProducer producer = producerBuilder.Build(); + var producer = producerBuilder.Build(); producer.Produce(topic, new Message { Value = "any_value", @@ -55,19 +55,19 @@ public async Task BasicProduceToTopicTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicProduceAsyncToTopicTest() { - ProducerConfig producerConfig = new ProducerConfig + var producerConfig = new ProducerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, }; InstrumentedProducerBuilder producerBuilder = new(producerConfig); var metrics = new List(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (var meterProvider = Sdk.CreateMeterProviderBuilder() .AddInMemoryExporter(metrics) .AddKafkaProducerInstrumentation(producerBuilder) .Build()) { - IProducer producer = producerBuilder.Build(); + var producer = producerBuilder.Build(); await producer.ProduceAsync(topic, new Message { Value = "any_value", @@ -87,9 +87,9 @@ public async Task BasicProduceAsyncToTopicTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicConsumeWithTimeoutTimespanTest() { - string topic = await KafkaHelpers.ProduceTestMessageAsync(); + var topic = await KafkaHelpers.ProduceTestMessageAsync(); - ConsumerConfig consumerConfig = new ConsumerConfig + var consumerConfig = new ConsumerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, GroupId = "test-consumer-group", @@ -104,7 +104,7 @@ public async Task BasicConsumeWithTimeoutTimespanTest() .AddKafkaConsumerInstrumentation(consumerBuilder) .Build()) { - using (IConsumer consumer = consumerBuilder.Build()) + using (var consumer = consumerBuilder.Build()) { consumer.Subscribe(topic); while (true) diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs index 1af7aefd27..7ed0bb9731 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs @@ -19,8 +19,8 @@ public void ShouldConvertConsumerBuilderToInstrumentedConsumerBuilder() var consumerBuilder = new ConsumerBuilder(config); - IDeserializer keyDeserializer = Deserializers.Utf8; - IDeserializer valueDeserializer = Deserializers.Utf8; + var keyDeserializer = Deserializers.Utf8; + var valueDeserializer = Deserializers.Utf8; consumerBuilder.SetErrorHandler(ErrorHandler); consumerBuilder.SetLogHandler(LogHandler); @@ -69,9 +69,20 @@ void OffsetsCommittedHandler(IConsumer consumer, CommittedOffset { } - IEnumerable PartitionsAssignedHandler(IConsumer consumer, List partitions) => new List(); - IEnumerable PartitionsRevokedHandler(IConsumer consumer, List partitions) => new List(); - IEnumerable PartitionsLostHandler(IConsumer consumer, List partitions) => new List(); + IEnumerable PartitionsAssignedHandler(IConsumer consumer, List partitions) + { + return []; + } + + IEnumerable PartitionsRevokedHandler(IConsumer consumer, List partitions) + { + return []; + } + + IEnumerable PartitionsLostHandler(IConsumer consumer, List partitions) + { + return []; + } } [Fact] @@ -85,8 +96,8 @@ public void ShouldConvertUserDefinedConsumerBuilderToInstrumentedConsumerBuilder var consumerBuilder = new CustomConsumerBuilder(config); - IDeserializer keyDeserializer = Deserializers.Utf8; - IDeserializer valueDeserializer = Deserializers.Utf8; + var keyDeserializer = Deserializers.Utf8; + var valueDeserializer = Deserializers.Utf8; consumerBuilder.SetErrorHandler(ErrorHandler); consumerBuilder.SetLogHandler(LogHandler); @@ -135,9 +146,20 @@ void OffsetsCommittedHandler(IConsumer consumer, CommittedOffset { } - IEnumerable PartitionsAssignedHandler(IConsumer consumer, List partitions) => new List(); - IEnumerable PartitionsRevokedHandler(IConsumer consumer, List partitions) => new List(); - IEnumerable PartitionsLostHandler(IConsumer consumer, List partitions) => new List(); + IEnumerable PartitionsAssignedHandler(IConsumer consumer, List partitions) + { + return []; + } + + IEnumerable PartitionsRevokedHandler(IConsumer consumer, List partitions) + { + return []; + } + + IEnumerable PartitionsLostHandler(IConsumer consumer, List partitions) + { + return []; + } } private class CustomConsumerBuilder(IEnumerable> config) diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs index 83025fe323..c82a5ca12b 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs @@ -19,8 +19,8 @@ public void ShouldConvertToInstrumentedProducerBuilder() var producerBuilder = new ProducerBuilder(config); - ISerializer keySerializer = Serializers.Utf8; - ISerializer valueSerializer = Serializers.Utf8; + var keySerializer = Serializers.Utf8; + var valueSerializer = Serializers.Utf8; producerBuilder.SetErrorHandler(ErrorHandler); producerBuilder.SetLogHandler(LogHandler); @@ -69,8 +69,8 @@ public void ShouldConvertUserDefinedProducerBuilderToInstrumentedProducerBuilder var producerBuilder = new CustomProducerBuilder(config); - ISerializer keySerializer = Serializers.Utf8; - ISerializer valueSerializer = Serializers.Utf8; + var keySerializer = Serializers.Utf8; + var valueSerializer = Serializers.Utf8; producerBuilder.SetErrorHandler(ErrorHandler); producerBuilder.SetLogHandler(LogHandler); diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs index b9de9a616b..cd60bd9a2c 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs @@ -24,21 +24,21 @@ To use Docker... [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicProduceAsyncToTopicTest() { - ProducerConfig producerConfig = new ProducerConfig + var producerConfig = new ProducerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, }; InstrumentedProducerBuilder producerBuilder = new(producerConfig); var sampler = new TestSampler(); var activities = new List(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (Sdk.CreateTracerProviderBuilder() .AddInMemoryExporter(activities) .SetSampler(sampler) .AddKafkaProducerInstrumentation(producerBuilder) .Build()) { - using IProducer producer = producerBuilder.Build(); + using var producer = producerBuilder.Build(); await producer.ProduceAsync(topic, new Message { Value = "any_value", @@ -56,21 +56,21 @@ public async Task BasicProduceAsyncToTopicTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicProduceAsyncToTopicPartitionTest() { - ProducerConfig producerConfig = new ProducerConfig + var producerConfig = new ProducerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, }; InstrumentedProducerBuilder producerBuilder = new(producerConfig); var sampler = new TestSampler(); var activities = new List(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (Sdk.CreateTracerProviderBuilder() .AddInMemoryExporter(activities) .SetSampler(sampler) .AddKafkaProducerInstrumentation(producerBuilder) .Build()) { - using IProducer producer = producerBuilder.Build(); + using var producer = producerBuilder.Build(); await producer.ProduceAsync(new TopicPartition(topic, new Partition(0)), new Message { Value = "any_value", @@ -89,21 +89,21 @@ public async Task BasicProduceAsyncToTopicPartitionTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public void BasicProduceSyncToTopicTest() { - ProducerConfig producerConfig = new ProducerConfig + var producerConfig = new ProducerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, }; InstrumentedProducerBuilder producerBuilder = new(producerConfig); var sampler = new TestSampler(); var activities = new List(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (Sdk.CreateTracerProviderBuilder() .AddInMemoryExporter(activities) .SetSampler(sampler) .AddKafkaProducerInstrumentation(producerBuilder) .Build()) { - using IProducer producer = producerBuilder.Build(); + using var producer = producerBuilder.Build(); producer.Produce(topic, new Message { Value = "any_value", @@ -121,21 +121,21 @@ public void BasicProduceSyncToTopicTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public void BasicProduceSyncToTopicPartitionTest() { - ProducerConfig producerConfig = new ProducerConfig + var producerConfig = new ProducerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, }; InstrumentedProducerBuilder producerBuilder = new(producerConfig); var sampler = new TestSampler(); var activities = new List(); - string topic = $"otel-topic-{Guid.NewGuid()}"; + var topic = $"otel-topic-{Guid.NewGuid()}"; using (Sdk.CreateTracerProviderBuilder() .AddInMemoryExporter(activities) .SetSampler(sampler) .AddKafkaProducerInstrumentation(producerBuilder) .Build()) { - using IProducer producer = producerBuilder.Build(); + using var producer = producerBuilder.Build(); producer.Produce(new TopicPartition(topic, new Partition(0)), new Message { Value = "any_value", @@ -154,9 +154,9 @@ public void BasicProduceSyncToTopicPartitionTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicConsumeWithCancellationTokenTest() { - string topic = await KafkaHelpers.ProduceTestMessageAsync(); + var topic = await KafkaHelpers.ProduceTestMessageAsync(); - ConsumerConfig consumerConfig = new ConsumerConfig + var consumerConfig = new ConsumerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, GroupId = "test-consumer-group", @@ -172,7 +172,7 @@ public async Task BasicConsumeWithCancellationTokenTest() .AddKafkaConsumerInstrumentation(consumerBuilder) .Build()) { - using IConsumer consumer = consumerBuilder.Build(); + using var consumer = consumerBuilder.Build(); consumer.Subscribe(topic); while (true) { @@ -205,9 +205,9 @@ public async Task BasicConsumeWithCancellationTokenTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicConsumeWithTimeoutMsTest() { - string topic = await KafkaHelpers.ProduceTestMessageAsync(); + var topic = await KafkaHelpers.ProduceTestMessageAsync(); - ConsumerConfig consumerConfig = new ConsumerConfig + var consumerConfig = new ConsumerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, GroupId = "test-consumer-group", @@ -223,7 +223,7 @@ public async Task BasicConsumeWithTimeoutMsTest() .AddKafkaConsumerInstrumentation(consumerBuilder) .Build()) { - using IConsumer consumer = consumerBuilder.Build(); + using var consumer = consumerBuilder.Build(); consumer.Subscribe(topic); while (true) { @@ -256,9 +256,9 @@ public async Task BasicConsumeWithTimeoutMsTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task BasicConsumeWithTimeoutTimespanTest() { - string topic = await KafkaHelpers.ProduceTestMessageAsync(); + var topic = await KafkaHelpers.ProduceTestMessageAsync(); - ConsumerConfig consumerConfig = new ConsumerConfig + var consumerConfig = new ConsumerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, GroupId = "test-consumer-group", @@ -274,7 +274,7 @@ public async Task BasicConsumeWithTimeoutTimespanTest() .AddKafkaConsumerInstrumentation(consumerBuilder) .Build()) { - using IConsumer consumer = consumerBuilder.Build(); + using var consumer = consumerBuilder.Build(); consumer.Subscribe(topic); while (true) { @@ -307,9 +307,9 @@ public async Task BasicConsumeWithTimeoutTimespanTest() [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] public async Task ConsumeAndProcessMessageTest() { - string topic = await KafkaHelpers.ProduceTestMessageAsync(); + var topic = await KafkaHelpers.ProduceTestMessageAsync(); - ConsumerConfig consumerConfig = new ConsumerConfig + var consumerConfig = new ConsumerConfig { BootstrapServers = KafkaHelpers.KafkaEndPoint, GroupId = "test-consumer-group", @@ -325,7 +325,7 @@ public async Task ConsumeAndProcessMessageTest() .AddKafkaConsumerInstrumentation(consumerBuilder) .Build()) { - using IConsumer consumer = consumerBuilder.Build(); + using var consumer = consumerBuilder.Build(); consumer.Subscribe(topic); while (true) { @@ -353,7 +353,7 @@ public async Task ConsumeAndProcessMessageTest() Assert.Equal(0L, processActivity.GetTagValue("messaging.kafka.message.offset")); Assert.Equal("test-consumer-group", processActivity.GetTagValue("messaging.kafka.consumer.group")); - ValueTask NoOpAsync( + static ValueTask NoOpAsync( ConsumeResult consumeResult, Activity? activity, CancellationToken cancellationToken = default)