Skip to content

Commit

Permalink
[repo/Kafka] Prepare to .NET9 (#2275)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kielek authored Oct 30, 2024
1 parent e61cea4 commit 503e0b1
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void SetSaslCredentials(string username, string password)

public ConsumeResult<TKey, TValue>? Consume(int millisecondsTimeout)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
var start = DateTimeOffset.UtcNow;
ConsumeResult<TKey, TValue>? result = null;
ConsumeResult consumeResult = default;
string? errorType = null;
Expand All @@ -67,7 +67,7 @@ public void SetSaslCredentials(string username, string password)
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
if (result is { IsPartitionEOF: false })
{
this.InstrumentConsumption(start, end, consumeResult, errorType);
Expand All @@ -77,7 +77,7 @@ public void SetSaslCredentials(string username, string password)

public ConsumeResult<TKey, TValue>? Consume(CancellationToken cancellationToken = default)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
var start = DateTimeOffset.UtcNow;
ConsumeResult<TKey, TValue>? result = null;
ConsumeResult consumeResult = default;
string? errorType = null;
Expand All @@ -94,7 +94,7 @@ public void SetSaslCredentials(string username, string password)
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
if (result is { IsPartitionEOF: false })
{
this.InstrumentConsumption(start, end, consumeResult, errorType);
Expand All @@ -104,7 +104,7 @@ public void SetSaslCredentials(string username, string password)

public ConsumeResult<TKey, TValue>? Consume(TimeSpan timeout)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
var start = DateTimeOffset.UtcNow;
ConsumeResult<TKey, TValue>? result = null;
ConsumeResult consumeResult = default;
string? errorType = null;
Expand All @@ -121,7 +121,7 @@ public void SetSaslCredentials(string username, string password)
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
if (result is { IsPartitionEOF: false })
{
this.InstrumentConsumption(start, end, consumeResult, errorType);
Expand Down Expand Up @@ -320,11 +320,11 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT
{
if (this.options.Traces)
{
PropagationContext propagationContext = consumeResult.Headers != null
var propagationContext = consumeResult.Headers != null
? OpenTelemetryConsumeResultExtensions.ExtractPropagationContext(consumeResult.Headers)
: default;

using Activity? activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key);
using var activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key);
if (activity != null)
{
if (errorType != null)
Expand All @@ -342,7 +342,7 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT

if (this.options.Metrics)
{
TimeSpan duration = endTime - startTime;
var duration = endTime - startTime;
RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType);
}
}
Expand All @@ -354,10 +354,10 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT
: string.Concat(topicPartitionOffset!.Topic, " ", ConfluentKafkaCommon.ReceiveOperationName);

ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagationContext.ActivityContext) }
: Array.Empty<ActivityLink>();
? [new ActivityLink(propagationContext.ActivityContext)]
: [];

Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default);
var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default);
if (activity?.IsAllDataRequested == true)
{
activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ internal bool EnableTraces
/// <returns>an <see cref="IProducer{TKey,TValue}"/>.</returns>
public override IConsumer<TKey, TValue> Build()
{
ConsumerConfig config = (ConsumerConfig)this.Config;
var config = (ConsumerConfig)this.Config;

var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.options);
consumer.GroupId = config.GroupId;
var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.options)
{
GroupId = config.GroupId,
};

return consumer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
Message<TKey, TValue> message,
CancellationToken cancellationToken = default)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
using Activity? activity = this.StartPublishActivity(start, topic, message);
var start = DateTimeOffset.UtcNow;
using var activity = this.StartPublishActivity(start, topic, message);
if (activity != null)
{
this.InjectActivity(activity, message);
Expand Down Expand Up @@ -71,9 +71,9 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
activity?.SetEndTime(end.UtcDateTime);
TimeSpan duration = end - start;
var duration = end - start;

if (this.options.Metrics)
{
Expand All @@ -89,8 +89,8 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
Message<TKey, TValue> message,
CancellationToken cancellationToken = default)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition);
var start = DateTimeOffset.UtcNow;
using var activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition);
if (activity != null)
{
this.InjectActivity(activity, message);
Expand Down Expand Up @@ -118,9 +118,9 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
activity?.SetEndTime(end.UtcDateTime);
TimeSpan duration = end - start;
var duration = end - start;

if (this.options.Metrics)
{
Expand All @@ -133,8 +133,8 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(

public void Produce(string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
using Activity? activity = this.StartPublishActivity(start, topic, message);
var start = DateTimeOffset.UtcNow;
using var activity = this.StartPublishActivity(start, topic, message);
if (activity != null)
{
this.InjectActivity(activity, message);
Expand All @@ -161,9 +161,9 @@ public void Produce(string topic, Message<TKey, TValue> message, Action<Delivery
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
activity?.SetEndTime(end.UtcDateTime);
TimeSpan duration = end - start;
var duration = end - start;

if (this.options.Metrics)
{
Expand All @@ -174,8 +174,8 @@ public void Produce(string topic, Message<TKey, TValue> message, Action<Delivery

public void Produce(TopicPartition topicPartition, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition);
var start = DateTimeOffset.UtcNow;
using var activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition);
if (activity != null)
{
this.InjectActivity(activity, message);
Expand All @@ -202,9 +202,9 @@ public void Produce(TopicPartition topicPartition, Message<TKey, TValue> message
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
var end = DateTimeOffset.UtcNow;
activity?.SetEndTime(end.UtcDateTime);
TimeSpan duration = end - start;
var duration = end - start;

if (this.options.Metrics)
{
Expand Down Expand Up @@ -364,7 +364,7 @@ private void InjectActivity(Activity? activity, Message<TKey, TValue> message)

private void InjectTraceContext(Message<TKey, TValue> message, string key, string value)
{
message.Headers ??= new Headers();
message.Headers ??= [];
message.Headers.Add(key, Encoding.UTF8.GetBytes(value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,7 @@ public static bool TryExtractPropagationContext<TKey, TValue>(
return consumeResult;
}

Activity? processActivity = null;
if (TryExtractPropagationContext(consumeResult, out var propagationContext))
{
processActivity = StartProcessActivity(
propagationContext,
consumeResult.TopicPartitionOffset,
consumeResult.Message.Key,
instrumentedConsumer.Name,
instrumentedConsumer.GroupId!);
}
else
{
processActivity = StartProcessActivity(
default,
consumeResult.TopicPartitionOffset,
consumeResult.Message.Key,
instrumentedConsumer.Name,
instrumentedConsumer.GroupId!);
}
var processActivity = StartProcessActivity(TryExtractPropagationContext(consumeResult, out var propagationContext) ? propagationContext : default, consumeResult.TopicPartitionOffset, consumeResult.Message.Key, instrumentedConsumer.Name, instrumentedConsumer.GroupId!);

try
{
Expand Down Expand Up @@ -152,10 +134,10 @@ internal static PropagationContext ExtractPropagationContext(Headers? headers)
: string.Concat(topicPartitionOffset!.Topic, " ", ConfluentKafkaCommon.ProcessOperationName);

ActivityLink[] activityLinks = propagationContext != default && propagationContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagationContext.ActivityContext) }
: Array.Empty<ActivityLink>();
? [new ActivityLink(propagationContext.ActivityContext)]
: [];

Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, parentContext: default);
var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, parentContext: default);
if (activity?.IsAllDataRequested == true)
{
activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class OpenTelemetryConsumerBuilderExtensions
#endif
public static InstrumentedConsumerBuilder<TKey, TValue> AsInstrumentedConsumerBuilder<TKey, TValue>(this ConsumerBuilder<TKey, TValue> consumerBuilder)
{
InstrumentedConsumerBuilder<TKey, TValue> result = new InstrumentedConsumerBuilder<TKey, TValue>(consumerBuilder.GetInternalConfig() ?? Enumerable.Empty<KeyValuePair<string, string>>());
var result = new InstrumentedConsumerBuilder<TKey, TValue>(consumerBuilder.GetInternalConfig() ?? []);
result.SetInternalErrorHandler(consumerBuilder.GetInternalErrorHandler());
result.SetInternalLogHandler(consumerBuilder.GetInternalLogHandler());
result.SetInternalStatisticsHandler(consumerBuilder.GetInternalStatisticsHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class OpenTelemetryProducerBuilderExtensions
#endif
public static InstrumentedProducerBuilder<TKey, TValue> AsInstrumentedProducerBuilder<TKey, TValue>(this ProducerBuilder<TKey, TValue> producerBuilder)
{
InstrumentedProducerBuilder<TKey, TValue> instrumentedProducerBuilder = new InstrumentedProducerBuilder<TKey, TValue>(producerBuilder.GetInternalConfig() ?? Enumerable.Empty<KeyValuePair<string, string>>());
var instrumentedProducerBuilder = new InstrumentedProducerBuilder<TKey, TValue>(producerBuilder.GetInternalConfig() ?? []);
instrumentedProducerBuilder.SetInternalLogHandler(producerBuilder.GetInternalLogHandler());
instrumentedProducerBuilder.SetInternalErrorHandler(producerBuilder.GetInternalErrorHandler());
instrumentedProducerBuilder.SetInternalStatisticsHandler(producerBuilder.GetInternalStatisticsHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class HostedMeteringTests(ITestOutputHelper outputHelper)
[SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)]
public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest()
{
List<Metric> metrics = new();
List<Metric> metrics = [];
var builder = Host.CreateDefaultBuilder();
builder.ConfigureServices(services =>
{
Expand Down Expand Up @@ -50,10 +50,10 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest()
{
await host.StartAsync();

string topic = $"otel-topic-{Guid.NewGuid()}";
var topic = $"otel-topic-{Guid.NewGuid()}";
using (var producer = host.Services.GetRequiredService<InstrumentedProducerBuilder<string, string>>().Build())
{
for (int i = 0; i < 100; i++)
for (var i = 0; i < 100; i++)
{
producer.Produce(topic, new Message<string, string>()
{
Expand All @@ -70,7 +70,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest()
{
consumer.Subscribe(topic);

int j = 0;
var j = 0;
while (true)
{
var consumerResult = consumer.Consume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ public class HostedTracingAndMeteringTests(ITestOutputHelper outputHelper)
[InlineData(false, false, false, false, false, false)]
public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool enableProducerMetrics, bool enableProducerTraces, bool useNamedProducerInstrumentation, bool enableConsumerMetrics, bool enableConsumerTraces, bool useNamedConsumerInstrumentation)
{
string? producerInstrumentationName = useNamedProducerInstrumentation ? "MyProducer" : null;
string? consumerInstrumentationName = useNamedConsumerInstrumentation ? "MyConsumer" : null;
List<Metric> metrics = new();
List<Activity> activities = new();
var producerInstrumentationName = useNamedProducerInstrumentation ? "MyProducer" : null;
var consumerInstrumentationName = useNamedConsumerInstrumentation ? "MyConsumer" : null;
List<Metric> metrics = [];
List<Activity> activities = [];
var builder = Host.CreateDefaultBuilder();
builder.ConfigureServices(services =>
{
Expand Down Expand Up @@ -176,12 +176,12 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool en
Assert.Equal(enableConsumerMetrics, consumerBuilder.EnableMetrics);
Assert.Equal(enableConsumerTraces, consumerBuilder.EnableTraces);

string topic = $"otel-topic-{Guid.NewGuid()}";
var topic = $"otel-topic-{Guid.NewGuid()}";
using (var producer = (useNamedProducerInstrumentation
? host.Services.GetRequiredKeyedService<InstrumentedProducerBuilder<string, string>>(producerInstrumentationName)
: host.Services.GetRequiredService<InstrumentedProducerBuilder<string, string>>()).Build())
{
for (int i = 0; i < 100; i++)
for (var i = 0; i < 100; i++)
{
producer.Produce(topic, new Message<string, string>()
{
Expand All @@ -207,7 +207,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool en
{
consumer.Subscribe(topic);

int j = 0;
var j = 0;
while (true)
{
var consumerResult = consumer.Consume();
Expand Down Expand Up @@ -238,7 +238,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest(bool en
host.Services.GetRequiredService<MeterProvider>().EnsureMetricsAreFlushed();
}

IGrouping<string, Metric>[] groups = metrics.GroupBy(x => x.Name).ToArray();
var groups = metrics.GroupBy(x => x.Name).ToArray();

if (enableProducerMetrics)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class HostedTracingTests(ITestOutputHelper outputHelper)
[SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)]
public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest()
{
List<Activity> activities = new();
List<Activity> activities = [];
var builder = Host.CreateDefaultBuilder();
builder.ConfigureServices(services =>
{
Expand Down Expand Up @@ -51,10 +51,10 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest()
{
await host.StartAsync();

string topic = $"otel-topic-{Guid.NewGuid()}";
var topic = $"otel-topic-{Guid.NewGuid()}";
using (var producer = host.Services.GetRequiredService<InstrumentedProducerBuilder<string, string>>().Build())
{
for (int i = 0; i < 100; i++)
for (var i = 0; i < 100; i++)
{
producer.Produce(topic, new Message<string, string>() { Key = $"any_key_{i}", Value = $"any_value_{i}", });
outputHelper.WriteLine("produced message {0}", i);
Expand All @@ -67,7 +67,7 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest()
{
consumer.Subscribe(topic);

int j = 0;
var j = 0;
while (true)
{
var consumerResult = consumer.Consume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ internal static class KafkaHelpers

public static async Task<string> ProduceTestMessageAsync()
{
string topic = $"otel-topic-{Guid.NewGuid()}";
ProducerConfig producerConfig = new ProducerConfig
var topic = $"otel-topic-{Guid.NewGuid()}";
var producerConfig = new ProducerConfig
{
BootstrapServers = KafkaEndPoint,
};
ProducerBuilder<string, string> producerBuilder = new(producerConfig);
IProducer<string, string> producer = producerBuilder.Build();
var producer = producerBuilder.Build();
await producer.ProduceAsync(topic, new Message<string, string>
{
Value = "any_value",
Expand Down
Loading

0 comments on commit 503e0b1

Please sign in to comment.