Skip to content

Commit

Permalink
fix(tasks): fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pogromistik committed Oct 12, 2023
1 parent 270cbdf commit 1f1d8dc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
29 changes: 17 additions & 12 deletions src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Collections.Concurrent;
using System.Reflection;
using Confluent.Kafka;
using KafkaFlow;
using KafkaFlow.Consumers;
using Microsoft.Extensions.Logging;

namespace Sitko.Core.Kafka;

Expand All @@ -18,14 +18,14 @@ private static readonly
)> Consumers = new();

private readonly IConsumerAccessor consumerAccessor;
private readonly ILogHandler logHandler;
private readonly ConcurrentDictionary<string, Task> tasks = new();
private IAdminClient? adminClient;
private ILogger<KafkaConsumerOffsetsEnsurer> logger;

public KafkaConsumerOffsetsEnsurer(IConsumerAccessor consumerAccessor, ILogHandler logHandler)
public KafkaConsumerOffsetsEnsurer(IConsumerAccessor consumerAccessor, ILogger<KafkaConsumerOffsetsEnsurer> logger)
{
this.consumerAccessor = consumerAccessor;
this.logHandler = logHandler;
this.logger = logger;
}

private IAdminClient GetAdminClient(string[] brokers)
Expand All @@ -37,8 +37,8 @@ private IAdminClient GetAdminClient(string[] brokers)
BootstrapServers = string.Join(",", brokers), ClientId = "AdminClient"
};
adminClient = new AdminClientBuilder(adminClientConfig)
.SetLogHandler((_, m) => logHandler.Info(m.Message, m))
.SetErrorHandler((_, error) => logHandler.Error("Kafka Consumer Error", null, new { Error = error }))
.SetLogHandler((_, m) => logger.LogInformation("{Message}", m.Message))
.SetErrorHandler((_, error) => logger.LogError("Kafka Consumer Error: {Error}", error))
.Build();
}

Expand Down Expand Up @@ -80,9 +80,9 @@ private async Task ProcessPartition(string[] brokers, string name, TopicPartitio
});
if (!commited.Any())
{
logHandler.Warning(
$"Не получилось найти оффсеты для назначенных партиций консьюмера {messageConsumer.ConsumerName}",
null);
logger.LogWarning(
"Не получилось найти оффсеты для назначенных партиций консьюмера {Consumer}",
messageConsumer.ConsumerName);
return;
}

Expand All @@ -95,12 +95,17 @@ private async Task ProcessPartition(string[] brokers, string name, TopicPartitio
{
var partitionOffset = confluentConsumer.QueryWatermarkOffsets(partition, TimeSpan.FromSeconds(30));
var newOffset = new TopicPartitionOffset(partition, partitionOffset.High);
logHandler.Warning(
$"Сохраняем отсутствующий оффсет для партиции {partition} консьюмера {name}: {newOffset.Offset}",
null);
logger.LogWarning(
"Сохраняем отсутствующий оффсет для партиции {Partition} консьюмера {Consumer}: {Offset}",
partition, name, newOffset.Offset);
kafkaFlowConsumer.Commit(new[] { newOffset });
}
}
catch (Exception ex)
{
logger.LogError(ex, "Error process partition {Partition}: {Error}", partition, ex);
throw;
}
finally
{
messageConsumer.Resume(new[] { partition });
Expand Down
4 changes: 2 additions & 2 deletions src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ protected override void ConfigureServicesInternal(IApplicationContext applicatio
var bufferSize = groupConsumers.Max(r => r.BufferSize);
kafkaConfigurator.AddConsumer(consumerBuilder =>
{
var groupName = $"{kafkaGroupPrefix}_{commonRegistration.GroupId}".Replace(".", "_");
var groupId = $"{kafkaGroupPrefix}_{commonRegistration.GroupId}".Replace(".", "_");
consumerBuilder.Topic(kafkaTopic);
consumerBuilder.WithName(name);
consumerBuilder.WithGroupId(commonRegistration.GroupId);
consumerBuilder.WithGroupId(groupId);
consumerBuilder.WithWorkersCount(parallelThreadCount);
consumerBuilder.WithBufferSize(bufferSize);
// для гарантии порядка событий
Expand Down

0 comments on commit 1f1d8dc

Please sign in to comment.