Skip to content

Commit

Permalink
feat(kafka): extend and unify options
Browse files Browse the repository at this point in the history
  • Loading branch information
SonicGD committed Oct 18, 2023
1 parent 782c2cd commit eacff8b
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 71 deletions.
73 changes: 50 additions & 23 deletions src/Sitko.Core.Kafka/KafkaConfigurator.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,39 @@
using KafkaFlow;
using Confluent.Kafka;
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers.DistributionStrategies;

namespace Sitko.Core.Kafka;

public class KafkaConfigurator
{
private readonly string[] brokers;
private readonly Dictionary<ConsumerRegistration, Action<IConsumerConfigurationBuilder, ConsumerConfig>>
consumerActions = new();

private readonly List<Action<IConsumerConfigurationBuilder>> consumerActions = new();
private readonly HashSet<ConsumerRegistration> consumers = new();
private readonly string name;
private readonly Dictionary<string, Action<IProducerConfigurationBuilder>> producerActions = new();
private readonly string clusterName;
private readonly Dictionary<string, Action<IProducerConfigurationBuilder, ProducerConfig>> producerActions = new();
private readonly Dictionary<string, (int Partitions, short ReplicationFactor)> topics = new();
private bool ensureOffsets;

internal KafkaConfigurator(string name, string[] brokers)
{
this.name = name;
this.brokers = brokers;
}
internal KafkaConfigurator(string clusterName) => this.clusterName = clusterName;

internal string[] Brokers => brokers;
internal HashSet<ConsumerRegistration> Consumers => consumers;
internal bool NeedToEnsureOffsets => ensureOffsets;

public KafkaConfigurator AddProducer(string producerName, Action<IProducerConfigurationBuilder> configure)
public KafkaConfigurator AddProducer(string producerName,
Action<IProducerConfigurationBuilder, ProducerConfig> configure)
{
producerActions[producerName] = configure;
return this;
}

public KafkaConfigurator AddConsumer(string consumerName, string groupId, TopicInfo[] topics,
Action<IConsumerConfigurationBuilder> configure)
public KafkaConfigurator AddConsumer(string consumerName, string groupId, TopicInfo[] consumerTopics,
Action<IConsumerConfigurationBuilder, ConsumerConfig> configure)
{
consumers.Add(new ConsumerRegistration(consumerName, groupId, topics));
consumerActions.Add(configure);
var registration = new ConsumerRegistration(consumerName, groupId, consumerTopics);
consumers.Add(registration);
consumerActions[registration] = configure;
return this;
}

Expand All @@ -50,14 +49,14 @@ public KafkaConfigurator EnsureOffsets(bool enable = true)
return this;
}

public void Build(IKafkaConfigurationBuilder builder) =>
public void Build(IKafkaConfigurationBuilder builder, KafkaModuleOptions options) =>
builder
.UseMicrosoftLog()
.AddCluster(clusterBuilder =>
{
clusterBuilder
.WithName(name)
.WithBrokers(brokers);
.WithName(clusterName)
.WithBrokers(options.Brokers);
if (!ensureOffsets)
{
foreach (var (topic, config) in topics)
Expand All @@ -68,17 +67,45 @@ public void Build(IKafkaConfigurationBuilder builder) =>

foreach (var (producerName, configure) in producerActions)
{
clusterBuilder.AddProducer(producerName, configurationBuilder =>
clusterBuilder.AddProducer(producerName, producerBuilder =>
{
configure(configurationBuilder);
var producerConfig = new ProducerConfig
{
ClientId = producerName,
MessageTimeoutMs = (int)options.MessageTimeout.TotalMilliseconds,
MessageMaxBytes = options.MessageMaxBytes,
EnableIdempotence = options.EnableIdempotence,
SocketNagleDisable = options.SocketNagleDisable,
Acks = options.Acks
};
producerBuilder.WithProducerConfig(producerConfig);
producerBuilder.WithLingerMs(options.MaxProducingTimeout.TotalMilliseconds);
configure(producerBuilder, producerConfig);
});
}

foreach (var consumerAction in consumerActions)
foreach (var (registration, configureAction) in consumerActions)
{
clusterBuilder.AddConsumer(consumerBuilder =>
{
consumerAction(consumerBuilder);
consumerBuilder.WithName(registration.Name);
consumerBuilder.Topics(registration.Topics.Select(info => info.Name));
consumerBuilder.WithGroupId(registration.GroupId);
consumerBuilder
.WithWorkDistributionStrategy<BytesSumDistributionStrategy>(); // guarantee events order
consumerBuilder.WithMaxPollIntervalMs((int)options.MaxPollInterval.TotalMilliseconds);
var consumerConfig = new ConsumerConfig
{
MaxPartitionFetchBytes = options.MaxPartitionFetchBytes,
AutoOffsetReset = options.AutoOffsetReset,
ClientId = registration.Name,
GroupInstanceId = registration.Name,
BootstrapServers = string.Join(",", options.Brokers),
SessionTimeoutMs = (int)options.SessionTimeout.TotalMilliseconds,
PartitionAssignmentStrategy = options.PartitionAssignmentStrategy
};
consumerBuilder.WithConsumerConfig(consumerConfig);
configureAction(consumerBuilder, consumerConfig);
});
}
});
Expand Down
6 changes: 3 additions & 3 deletions src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ internal class KafkaConsumerOffsetsEnsurer

public KafkaConsumerOffsetsEnsurer(ILogger<KafkaConsumerOffsetsEnsurer> logger) => this.logger = logger;

public async Task EnsureOffsetsAsync(KafkaConfigurator configurator)
public async Task EnsureOffsetsAsync(KafkaConfigurator configurator, KafkaModuleOptions options)
{
var adminClient = GetAdminClient(configurator.Brokers);
var adminClient = GetAdminClient(options.Brokers);
foreach (var consumer in configurator.Consumers)
{
foreach (var topic in consumer.Topics)
{
await EnsureTopicOffsetsAsync(consumer, adminClient, topic, configurator.Brokers);
await EnsureTopicOffsetsAsync(consumer, adminClient, topic, options.Brokers);
}
}
}
Expand Down
43 changes: 36 additions & 7 deletions src/Sitko.Core.Kafka/KafkaModule.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
using KafkaFlow;
using Confluent.Kafka;
using FluentValidation;
using KafkaFlow;
using Microsoft.Extensions.DependencyInjection;
using Sitko.Core.App;
using Acks = Confluent.Kafka.Acks;

namespace Sitko.Core.Kafka;

public class KafkaModule : BaseApplicationModule
public class KafkaModule : BaseApplicationModule<KafkaModuleOptions>
{
private static readonly Dictionary<string, KafkaConfigurator> Configurators = new();
public override bool AllowMultiple => false;

public override string OptionsKey => "Kafka";

public static KafkaConfigurator CreateConfigurator(string name, string[] brokers) =>
Configurators.SafeGetOrAdd(name, _ => new KafkaConfigurator(name, brokers));
public static KafkaConfigurator CreateConfigurator(string name) =>
Configurators.SafeGetOrAdd(name, _ => new KafkaConfigurator(name));

public override void PostConfigureServices(IApplicationContext applicationContext, IServiceCollection services,
BaseApplicationModuleOptions startupOptions)
KafkaModuleOptions startupOptions)
{
base.ConfigureServices(applicationContext, services, startupOptions);
services.AddSingleton<KafkaConsumerOffsetsEnsurer>();
services.AddKafkaFlowHostedService(builder =>
{
foreach (var (_, configurator) in Configurators)
{
configurator.Build(builder);
configurator.Build(builder, startupOptions);
}
});
}
Expand All @@ -32,12 +35,38 @@ public override async Task InitAsync(IApplicationContext applicationContext, ISe
{
await base.InitAsync(applicationContext, serviceProvider);
var offsetsEnsurer = serviceProvider.GetRequiredService<KafkaConsumerOffsetsEnsurer>();
var options = GetOptions(serviceProvider);
foreach (var (_, configurator) in Configurators)
{
if (configurator.NeedToEnsureOffsets)
{
await offsetsEnsurer.EnsureOffsetsAsync(configurator);
await offsetsEnsurer.EnsureOffsetsAsync(configurator, options);
}
}
}
}

public class KafkaModuleOptions : BaseModuleOptions
{
public string[] Brokers { get; set; } = Array.Empty<string>();
public TimeSpan SessionTimeout { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan MaxPollInterval { get; set; } = TimeSpan.FromMinutes(5);
public int MaxPartitionFetchBytes { get; set; } = 5 * 1024 * 1024;
public Confluent.Kafka.AutoOffsetReset AutoOffsetReset { get; set; } = Confluent.Kafka.AutoOffsetReset.Latest;

public PartitionAssignmentStrategy PartitionAssignmentStrategy { get; set; } =
PartitionAssignmentStrategy.CooperativeSticky;

public TimeSpan MessageTimeout { get; set; } = TimeSpan.FromSeconds(12);
public int MessageMaxBytes { get; set; } = 5 * 1024 * 1024;
public TimeSpan MaxProducingTimeout { get; set; } = TimeSpan.FromMilliseconds(100);
public bool EnableIdempotence { get; set; } = true;
public bool SocketNagleDisable { get; set; } = true;
public Acks Acks { get; set; } = Acks.All;
}

public class KafkaModuleOptionsValidator : AbstractValidator<KafkaModuleOptions>
{
public KafkaModuleOptionsValidator() =>
RuleFor(options => options.Brokers).NotEmpty().WithMessage("Specify Kafka brokers");
}
8 changes: 5 additions & 3 deletions src/Sitko.Core.Tasks.Kafka/ApplicationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ public static class ApplicationExtensions
{
public static Application AddKafkaTasks<TBaseTask, TDbContext>(this Application application,
Action<KafkaTasksModuleOptions<TBaseTask, TDbContext>> configure, bool configurePostgres = false,
Action<PostgresDatabaseModuleOptions<TDbContext>>? configurePostgresAction = null) where TBaseTask : BaseTask
Action<PostgresDatabaseModuleOptions<TDbContext>>? configurePostgresAction = null,
bool configureKafka = true,
Action<KafkaModuleOptions>? configureKafkaAction = null) where TBaseTask : BaseTask
where TDbContext : TasksDbContext<TBaseTask>
{
application.AddTasks<TBaseTask, TDbContext>(configurePostgres, configurePostgresAction);
application.AddModule<KafkaTasksModule<TBaseTask, TDbContext>, KafkaTasksModuleOptions<TBaseTask, TDbContext>>(
configure);
if (!application.HasModule<KafkaModule>())
if (configureKafka && !application.HasModule<KafkaModule>())
{
application.AddModule<KafkaModule>();
application.AddModule<KafkaModule, KafkaModuleOptions>(configureKafkaAction);
}

return application;
Expand Down
34 changes: 4 additions & 30 deletions src/Sitko.Core.Tasks.Kafka/KafkaTasksModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ protected override void ConfigureServicesInternal(IApplicationContext applicatio
EventsRegistry.Register(executor.EventType, kafkaTopic, producerName);
}

var kafkaConfigurator = KafkaModule.CreateConfigurator("Kafka_Tasks_Cluster", startupOptions.Brokers);
var kafkaConfigurator = KafkaModule.CreateConfigurator("Kafka_Tasks_Cluster");
kafkaConfigurator
.AutoCreateTopic(kafkaTopic, startupOptions.TopicPartitions, startupOptions.TopicReplicationFactor)
.EnsureOffsets()
.AddProducer(producerName, builder =>
.AddProducer(producerName, (builder, _) =>
{
builder.DefaultTopic(kafkaTopic);
builder.AddMiddlewares(middlewareBuilder =>
Expand All @@ -68,33 +68,10 @@ protected override void ConfigureServicesInternal(IApplicationContext applicatio
new[]
{
new TopicInfo(kafkaTopic, startupOptions.TopicPartitions, startupOptions.TopicReplicationFactor)
}, consumerBuilder =>
}, (consumerBuilder, _) =>
{
if (startupOptions.MaxPollIntervalMs > 0)
{
consumerBuilder.WithMaxPollIntervalMs(startupOptions.MaxPollIntervalMs);
}

consumerBuilder.Topic(kafkaTopic);
consumerBuilder.WithName(name);
consumerBuilder.WithGroupId(groupId);
consumerBuilder.WithWorkersCount(parallelThreadCount);
consumerBuilder.WithBufferSize(bufferSize);
// для гарантии порядка событий
consumerBuilder
.WithWorkDistributionStrategy<BytesSumDistributionStrategy>();
var consumerConfig = new ConsumerConfig
{
AutoOffsetReset = AutoOffsetReset.Latest,
ClientId = name,
GroupInstanceId = name,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
};
if (startupOptions.SessionTimeoutMs > 0)
{
consumerConfig.SessionTimeoutMs = startupOptions.SessionTimeoutMs;
}
consumerBuilder.WithConsumerConfig(consumerConfig);
consumerBuilder.AddMiddlewares(
middlewares =>
{
Expand All @@ -116,9 +93,6 @@ public class
KafkaTasksModuleOptions<TBaseTask, TDbContext>> where TBaseTask : BaseTask
where TDbContext : TasksDbContext<TBaseTask>
{
public KafkaModuleOptionsValidator()
{
RuleFor(options => options.Brokers).NotEmpty().WithMessage("Specify Kafka brokers");
public KafkaModuleOptionsValidator() =>
RuleFor(options => options.TasksTopic).NotEmpty().WithMessage("Specify Kafka topic");
}
}
5 changes: 0 additions & 5 deletions src/Sitko.Core.Tasks.Kafka/KafkaTasksModuleOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@ public class KafkaTasksModuleOptions<TBaseTask, TDbContext> : TasksModuleOptions
where TDbContext : TasksDbContext<TBaseTask>
{
public override Type GetValidatorType() => typeof(KafkaModuleOptionsValidator<TBaseTask, TDbContext>);
public string[] Brokers { get; set; } = Array.Empty<string>();
public string TasksTopic { get; set; } = "";
public bool AddTopicPrefix { get; set; } = true;
public string TopicPrefix { get; set; } = "";
public int TopicPartitions { get; set; } = 24;
public short TopicReplicationFactor { get; set; } = 1;
public bool AddConsumerGroupPrefix { get; set; } = true;
public string ConsumerGroupPrefix { get; set; } = "";

public int SessionTimeoutMs { get; set; }

public int MaxPollIntervalMs { get; set; }
}

0 comments on commit eacff8b

Please sign in to comment.