Skip to content

Commit

Permalink
feat: ensure consumer offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
SonicGD committed Oct 12, 2023
1 parent e88e8a6 commit cb351ae
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 1 deletion.
18 changes: 17 additions & 1 deletion src/Sitko.Core.Kafka/KafkaConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal KafkaConfigurator(string name, string[] brokers)
private readonly List<Action<IConsumerConfigurationBuilder>> consumerActions = new();
private readonly Dictionary<string, Action<IProducerConfigurationBuilder>> producerActions = new();
private readonly Dictionary<string, (int Partitions, short ReplicationFactor)> topics = new();
private bool ensureOffsets;

public KafkaConfigurator AddProducer(string producerName, Action<IProducerConfigurationBuilder> configure)
{
Expand All @@ -36,6 +37,12 @@ public KafkaConfigurator AutoCreateTopic(string topic, int partitions, short rep
return this;
}

public KafkaConfigurator EnsureOffsets(bool enable = true)
{
ensureOffsets = enable;
return this;
}

public void Build(IKafkaConfigurationBuilder builder) =>
builder
.UseMicrosoftLog()
Expand All @@ -58,7 +65,16 @@ public void Build(IKafkaConfigurationBuilder builder) =>

foreach (var consumerAction in consumerActions)
{
clusterBuilder.AddConsumer(consumerAction);
clusterBuilder.AddConsumer(consumerBuilder =>
{
consumerAction(consumerBuilder);
if (ensureOffsets)
{
consumerBuilder.WithPartitionsAssignedHandler((resolver, list) =>
resolver.Resolve<KafkaConsumerOffsetsEnsurer>()
.EnsureOffsets(brokers, name, list));
}
});
}
});
}
162 changes: 162 additions & 0 deletions src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
using System.Collections.Concurrent;
using System.Reflection;
using Confluent.Kafka;
using KafkaFlow;
using KafkaFlow.Consumers;

namespace Sitko.Core.Kafka;

internal class KafkaConsumerOffsetsEnsurer
{
private static FieldInfo? consumerManagerField;
private static FieldInfo? directConsumerField;
private static PropertyInfo? consumerProperty;
private static readonly HashSet<string> ProcessedPartitions = new();

private static readonly
ConcurrentDictionary<IMessageConsumer, (IConsumer kafkaFlowConsumer, IConsumer<byte[], byte[]> confluentConsumer
)> Consumers = new();

private readonly IConsumerAccessor consumerAccessor;
private readonly ILogHandler logHandler;
private readonly ConcurrentDictionary<string, Task> tasks = new();
private IAdminClient? adminClient;

public KafkaConsumerOffsetsEnsurer(IConsumerAccessor consumerAccessor, ILogHandler logHandler)
{
this.consumerAccessor = consumerAccessor;
this.logHandler = logHandler;
}

private IAdminClient GetAdminClient(string[] brokers)
{
if (adminClient is null)
{
var adminClientConfig = new AdminClientConfig
{
BootstrapServers = string.Join(",", brokers), ClientId = "AdminClient"
};
adminClient = new AdminClientBuilder(adminClientConfig)
.SetLogHandler((_, m) => logHandler.Info(m.Message, m))
.SetErrorHandler((_, error) => logHandler.Error("Kafka Consumer Error", null, new { Error = error }))
.Build();
}

return adminClient;
}

public void EnsureOffsets(
string[] brokers,
string name,
List<TopicPartition> list
)
{
foreach (var partition in list)
{
var key = $"{name}/{partition.Partition.Value}";
if (ProcessedPartitions.Contains(key))
{
continue;
}

tasks.GetOrAdd(
key, _ => { return Task.Run(async () => await ProcessPartition(brokers, name, partition)); }
);
ProcessedPartitions.Add(key);
}
}

private async Task ProcessPartition(string[] brokers, string name, TopicPartition partition)
{
var messageConsumer = consumerAccessor.GetConsumer(name);
messageConsumer.Pause(new[] { partition });
try
{
var (kafkaFlowConsumer, confluentConsumer) = GetConsumers(messageConsumer);

var commited = await GetAdminClient(brokers).ListConsumerGroupOffsetsAsync(new[]
{
new ConsumerGroupTopicPartitions(messageConsumer.GroupId, new List<TopicPartition> { partition })
});
if (!commited.Any())
{
logHandler.Warning(
$"Не получилось найти оффсеты для назначенных партиций консьюмера {messageConsumer.ConsumerName}",
null);
return;
}

var currentOffset = commited.First().Partitions.FirstOrDefault(
partitionOffset =>
partitionOffset.TopicPartition == partition
);

if (currentOffset is null || currentOffset.Offset == Offset.Unset)
{
var partitionOffset = confluentConsumer.QueryWatermarkOffsets(partition, TimeSpan.FromSeconds(30));
var newOffset = new TopicPartitionOffset(partition, partitionOffset.High);
logHandler.Warning(
$"Сохраняем отсутствующий оффсет для партиции {partition} консьюмера {name}: {newOffset.Offset}",
null);
kafkaFlowConsumer.Commit(new[] { newOffset });
}
}
finally
{
messageConsumer.Resume(new[] { partition });
}
}

private static (IConsumer kafkaFlowConsumer, IConsumer<byte[], byte[]> confluentConsumer) GetConsumers(
IMessageConsumer consumer) =>
Consumers.GetOrAdd(
consumer, messageConsumer =>
{
consumerManagerField ??= messageConsumer.GetType().GetField(
"consumerManager",
BindingFlags.Instance |
BindingFlags.NonPublic
) ??
throw new InvalidOperationException(
"Can't find field consumerManager"
);
var consumerManager =
consumerManagerField.GetValue(messageConsumer) ??
throw new InvalidOperationException(
"Can't get consumerManager"
);
consumerProperty ??= consumerManager.GetType()
.GetProperty(
"Consumer",
BindingFlags.Instance |
BindingFlags.Public
) ??
throw new InvalidOperationException(
"Can't find field consumer"
);
var flowConsumer =
consumerProperty.GetValue(consumerManager) as IConsumer ??
throw new InvalidOperationException(
"Can't get flowConsumer"
);

directConsumerField ??= flowConsumer.GetType()
.GetField(
"consumer",
BindingFlags.Instance |
BindingFlags.NonPublic
) ??
throw new InvalidOperationException(
"Can't find field directConsumer"
);
var confluentConsumer =
directConsumerField.GetValue(flowConsumer) as
IConsumer<byte[], byte[]> ??
throw new InvalidOperationException(
"Can't getdirectConsumer"
);

return (flowConsumer, confluentConsumer);
}
);
}
2 changes: 2 additions & 0 deletions src/Sitko.Core.Kafka/KafkaModule.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using KafkaFlow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Sitko.Core.App;

namespace Sitko.Core.Kafka;
Expand All @@ -18,6 +19,7 @@ public override void PostConfigureServices(IApplicationContext applicationContex
BaseApplicationModuleOptions startupOptions)
{
base.ConfigureServices(applicationContext, services, startupOptions);
services.TryAddSingleton<KafkaConsumerOffsetsEnsurer>();
services.AddKafkaFlowHostedService(builder =>
{
foreach (var (_, configurator) in Configurators)
Expand Down
1 change: 1 addition & 0 deletions src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected override void ConfigureServicesInternal(IApplicationContext applicatio
var kafkaConfigurator = KafkaModule.CreateConfigurator($"Kafka_Tasks_Cluster", startupOptions.Brokers);
kafkaConfigurator
.AutoCreateTopic(kafkaTopic, startupOptions.TopicPartitions, startupOptions.TopicReplicationFactor)
.EnsureOffsets()
.AddProducer(producerName, builder =>
{
builder.DefaultTopic(kafkaTopic);
Expand Down

0 comments on commit cb351ae

Please sign in to comment.