diff --git a/src/Sitko.Core.Kafka/KafkaConfigurator.cs b/src/Sitko.Core.Kafka/KafkaConfigurator.cs index e44460600..e6c0465c3 100644 --- a/src/Sitko.Core.Kafka/KafkaConfigurator.cs +++ b/src/Sitko.Core.Kafka/KafkaConfigurator.cs @@ -17,6 +17,7 @@ internal KafkaConfigurator(string name, string[] brokers) private readonly List> consumerActions = new(); private readonly Dictionary> producerActions = new(); private readonly Dictionary topics = new(); + private bool ensureOffsets; public KafkaConfigurator AddProducer(string producerName, Action configure) { @@ -36,6 +37,12 @@ public KafkaConfigurator AutoCreateTopic(string topic, int partitions, short rep return this; } + public KafkaConfigurator EnsureOffsets(bool enable = true) + { + ensureOffsets = enable; + return this; + } + public void Build(IKafkaConfigurationBuilder builder) => builder .UseMicrosoftLog() @@ -58,7 +65,16 @@ public void Build(IKafkaConfigurationBuilder builder) => foreach (var consumerAction in consumerActions) { - clusterBuilder.AddConsumer(consumerAction); + clusterBuilder.AddConsumer(consumerBuilder => + { + consumerAction(consumerBuilder); + if (ensureOffsets) + { + consumerBuilder.WithPartitionsAssignedHandler((resolver, list) => + resolver.Resolve() + .EnsureOffsets(brokers, name, list)); + } + }); } }); } diff --git a/src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs b/src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs new file mode 100644 index 000000000..6976f6f41 --- /dev/null +++ b/src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs @@ -0,0 +1,162 @@ +using System.Collections.Concurrent; +using System.Reflection; +using Confluent.Kafka; +using KafkaFlow; +using KafkaFlow.Consumers; + +namespace Sitko.Core.Kafka; + +internal class KafkaConsumerOffsetsEnsurer +{ + private static FieldInfo? consumerManagerField; + private static FieldInfo? directConsumerField; + private static PropertyInfo? consumerProperty; + private static readonly HashSet ProcessedPartitions = new(); + + private static readonly + ConcurrentDictionary confluentConsumer + )> Consumers = new(); + + private readonly IConsumerAccessor consumerAccessor; + private readonly ILogHandler logHandler; + private readonly ConcurrentDictionary tasks = new(); + private IAdminClient? adminClient; + + public KafkaConsumerOffsetsEnsurer(IConsumerAccessor consumerAccessor, ILogHandler logHandler) + { + this.consumerAccessor = consumerAccessor; + this.logHandler = logHandler; + } + + private IAdminClient GetAdminClient(string[] brokers) + { + if (adminClient is null) + { + var adminClientConfig = new AdminClientConfig + { + BootstrapServers = string.Join(",", brokers), ClientId = "AdminClient" + }; + adminClient = new AdminClientBuilder(adminClientConfig) + .SetLogHandler((_, m) => logHandler.Info(m.Message, m)) + .SetErrorHandler((_, error) => logHandler.Error("Kafka Consumer Error", null, new { Error = error })) + .Build(); + } + + return adminClient; + } + + public void EnsureOffsets( + string[] brokers, + string name, + List list + ) + { + foreach (var partition in list) + { + var key = $"{name}/{partition.Partition.Value}"; + if (ProcessedPartitions.Contains(key)) + { + continue; + } + + tasks.GetOrAdd( + key, _ => { return Task.Run(async () => await ProcessPartition(brokers, name, partition)); } + ); + ProcessedPartitions.Add(key); + } + } + + private async Task ProcessPartition(string[] brokers, string name, TopicPartition partition) + { + var messageConsumer = consumerAccessor.GetConsumer(name); + messageConsumer.Pause(new[] { partition }); + try + { + var (kafkaFlowConsumer, confluentConsumer) = GetConsumers(messageConsumer); + + var commited = await GetAdminClient(brokers).ListConsumerGroupOffsetsAsync(new[] + { + new ConsumerGroupTopicPartitions(messageConsumer.GroupId, new List { partition }) + }); + if (!commited.Any()) + { + logHandler.Warning( + $"Не получилось найти оффсеты для назначенных партиций консьюмера {messageConsumer.ConsumerName}", + null); + return; + } + + var currentOffset = commited.First().Partitions.FirstOrDefault( + partitionOffset => + partitionOffset.TopicPartition == partition + ); + + if (currentOffset is null || currentOffset.Offset == Offset.Unset) + { + var partitionOffset = confluentConsumer.QueryWatermarkOffsets(partition, TimeSpan.FromSeconds(30)); + var newOffset = new TopicPartitionOffset(partition, partitionOffset.High); + logHandler.Warning( + $"Сохраняем отсутствующий оффсет для партиции {partition} консьюмера {name}: {newOffset.Offset}", + null); + kafkaFlowConsumer.Commit(new[] { newOffset }); + } + } + finally + { + messageConsumer.Resume(new[] { partition }); + } + } + + private static (IConsumer kafkaFlowConsumer, IConsumer confluentConsumer) GetConsumers( + IMessageConsumer consumer) => + Consumers.GetOrAdd( + consumer, messageConsumer => + { + consumerManagerField ??= messageConsumer.GetType().GetField( + "consumerManager", + BindingFlags.Instance | + BindingFlags.NonPublic + ) ?? + throw new InvalidOperationException( + "Can't find field consumerManager" + ); + var consumerManager = + consumerManagerField.GetValue(messageConsumer) ?? + throw new InvalidOperationException( + "Can't get consumerManager" + ); + consumerProperty ??= consumerManager.GetType() + .GetProperty( + "Consumer", + BindingFlags.Instance | + BindingFlags.Public + ) ?? + throw new InvalidOperationException( + "Can't find field consumer" + ); + var flowConsumer = + consumerProperty.GetValue(consumerManager) as IConsumer ?? + throw new InvalidOperationException( + "Can't get flowConsumer" + ); + + directConsumerField ??= flowConsumer.GetType() + .GetField( + "consumer", + BindingFlags.Instance | + BindingFlags.NonPublic + ) ?? + throw new InvalidOperationException( + "Can't find field directConsumer" + ); + var confluentConsumer = + directConsumerField.GetValue(flowConsumer) as + IConsumer ?? + throw new InvalidOperationException( + "Can't getdirectConsumer" + ); + + return (flowConsumer, confluentConsumer); + } + ); +} diff --git a/src/Sitko.Core.Kafka/KafkaModule.cs b/src/Sitko.Core.Kafka/KafkaModule.cs index ca75dbcfe..bf1b01f48 100644 --- a/src/Sitko.Core.Kafka/KafkaModule.cs +++ b/src/Sitko.Core.Kafka/KafkaModule.cs @@ -1,5 +1,6 @@ using KafkaFlow; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Sitko.Core.App; namespace Sitko.Core.Kafka; @@ -18,6 +19,7 @@ public override void PostConfigureServices(IApplicationContext applicationContex BaseApplicationModuleOptions startupOptions) { base.ConfigureServices(applicationContext, services, startupOptions); + services.TryAddSingleton(); services.AddKafkaFlowHostedService(builder => { foreach (var (_, configurator) in Configurators) diff --git a/src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs b/src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs index ea171283d..b808b9a83 100644 --- a/src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs +++ b/src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs @@ -42,6 +42,7 @@ protected override void ConfigureServicesInternal(IApplicationContext applicatio var kafkaConfigurator = KafkaModule.CreateConfigurator($"Kafka_Tasks_Cluster", startupOptions.Brokers); kafkaConfigurator .AutoCreateTopic(kafkaTopic, startupOptions.TopicPartitions, startupOptions.TopicReplicationFactor) + .EnsureOffsets() .AddProducer(producerName, builder => { builder.DefaultTopic(kafkaTopic);