From e0b408e3cb142c042c14ae31eab4a83785d717ba Mon Sep 17 00:00:00 2001
From: MASES Public Developers Team
<94312179+masesdevelopers@users.noreply.github.com>
Date: Wed, 11 Oct 2023 18:09:20 +0200
Subject: [PATCH] Update documentation and usage of `ConsumerConfigBuilder`
(#84)
* Hyperlink fix
* Missing file update
* Describe and use ConsumerConfigBuilder
* Name simplification
---
README.md | 2 +-
src/documentation/articles/actualstate.md | 6 --
src/documentation/articles/currentstate.md | 6 ++
src/documentation/articles/kafkadbcontext.md | 37 ++++++++--
src/documentation/articles/roadmap.md | 6 +-
src/documentation/index.md | 4 +-
.../Internal/IKafkaSingletonOptions.cs | 9 ++-
.../Internal/KafkaOptionsExtension.cs | 68 +++++++++++--------
.../Internal/KafkaSingletonOptions.cs | 16 +++--
.../KEFCore/Infrastructure/KafkaDbContext.cs | 46 +++++++++++--
.../KafkaDbContextOptionsBuilder.cs | 38 +++++++----
.../Storage/Internal/EntityTypeProducer.cs | 5 +-
.../KEFCore/Storage/Internal/KafkaCluster.cs | 6 +-
test/KEFCore.Benchmark.Test/Program.cs | 4 +-
test/KEFCore.Complex.Test/Program.cs | 2 +-
test/KEFCore.Test/Program.cs | 2 +-
16 files changed, 174 insertions(+), 83 deletions(-)
delete mode 100644 src/documentation/articles/actualstate.md
create mode 100644 src/documentation/articles/currentstate.md
diff --git a/README.md b/README.md
index bbe0ed0d..08dd2228 100644
--- a/README.md
+++ b/README.md
@@ -34,7 +34,7 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
## Summary
* [Roadmap](src/documentation/articles/roadmap.md)
-* [Actual state](src/documentation/articles/actualstate.md)
+* [Current state](src/documentation/articles/currentstate.md)
* [KEFCore usage](src/documentation/articles/usage.md)
## Runtime engine
diff --git a/src/documentation/articles/actualstate.md b/src/documentation/articles/actualstate.md
deleted file mode 100644
index fea14c5a..00000000
--- a/src/documentation/articles/actualstate.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# KEFCore: development state
-
-This release had implemented this features:
-
-* Created a first working provider
-* Initial development of Kafka Connect (under development)
diff --git a/src/documentation/articles/currentstate.md b/src/documentation/articles/currentstate.md
new file mode 100644
index 00000000..b794d2bc
--- /dev/null
+++ b/src/documentation/articles/currentstate.md
@@ -0,0 +1,6 @@
+# KEFCore: development state
+
+The latest release implementes these features:
+
+* [x] A working provider based on Apache Kafka Streams
+* [x] The provider can use KNetCompactedReplicator
\ No newline at end of file
diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md
index d785d8e6..e2c03103 100644
--- a/src/documentation/articles/kafkadbcontext.md
+++ b/src/documentation/articles/kafkadbcontext.md
@@ -11,16 +11,43 @@
- **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true**
- **UsePersistentStorage**: set to **true** to use a persintent storage between multiple application startup
- **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics
- - **ProducerConfigBuilder**: parameters to use for Producer
- - **StreamsConfigBuilder**: parameters to use for Apche Kafka Streams application
- - **TopicConfigBuilder**: parameters to use on topic creation for each entity
+ - **ConsumerConfig**: parameters to use for Producer
+ - **ProducerConfig**: parameters to use for Producer
+ - **StreamsConfig**: parameters to use for Apche Kafka Streams application
+ - **TopicConfig**: parameters to use on topic creation for each entity
## How to use `KafkaDbContext` class
The most simple example of usage can be found in [KEFCore usage](usage.md). By default, `KafkaDbContext` automatically manages `OnConfiguring` method of `DbContext`:
-- checks for mandatory opions like **BootstrapServers** and **DbName**
-- setup the options to use an Apache Kafka cluster
+- `KafkaDbContext` checks the mandatory options like **BootstrapServers** and **DbName**
+- `KafkaDbContext` setup the options needed to use an Apache Kafka cluster:
+ - default `ConsumerConfig` can be overridden using **ConsumerConfig** property of `KafkaDbContext`
+ - default `ProducerConfig` can be overridden using **ProducerConfig** property of `KafkaDbContext`
+ - default `StreamsConfig` can be overridden using **StreamsConfig** property of `KafkaDbContext`
+ - default `TopicConfig` can be overridden using **TopicConfig** property of `KafkaDbContext`
+### Default **ConsumerConfig**
+Over the Apache Kafka defaults it applies:
+- EnableAutoCommit is **true**
+- AutoOffsetReset set to **EARLIEST**
+- AllowAutoCreateTopics set to **false**
+
+### Default **ProducerConfig**
+
+Does not change anything than the Apache Kafka defaults
+
+### Default **ConsumerConfig**
+
+Does not change anything than the Apache Kafka defaults
+
+### Default **TopicConfig**
+
+Over the Apache Kafka defaults it applies:
+
+- DeleteRetentionMs set to 100 ms
+- MinCleanableDirtyRatio set to 0.01
+- SegmentMs set to 100 ms
+- RetentionBytes set to 1073741824 bytes (1 Gb)
diff --git a/src/documentation/articles/roadmap.md b/src/documentation/articles/roadmap.md
index c67c227e..0d8c93d0 100644
--- a/src/documentation/articles/roadmap.md
+++ b/src/documentation/articles/roadmap.md
@@ -2,6 +2,6 @@
The roadmap can be synthetized in the following points:
-* Create a first working provider based on InMemory provider
-* Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
-* Use KNetCompactedReplicator beside Apache Kafka Streams
+* [x] Create a first working provider starting from the code of InMemory provider
+* [ ] Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
+* [x] Use KNetCompactedReplicator beside Apache Kafka Streams
diff --git a/src/documentation/index.md b/src/documentation/index.md
index 08394643..9727e377 100644
--- a/src/documentation/index.md
+++ b/src/documentation/index.md
@@ -34,8 +34,8 @@ Have a look at the following resources:
---
## Summary
-* [Roadmap](src/net/Documentation/articles/roadmap.md)
-* [Actual state](src/net/Documentation/articles/actualstate.md)
+* [Roadmap](articles/roadmap.md)
+* [Current state](articles/currentstate.md)
* [KEFCore usage](articles/usage.md)
---
diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
index 19de56c9..047f4172 100644
--- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
@@ -19,6 +19,7 @@
#nullable enable
using MASES.KNet.Common;
+using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
@@ -51,9 +52,11 @@ public interface IKafkaSingletonOptions : ISingletonOptions
int DefaultReplicationFactor { get; }
- ProducerConfigBuilder? ProducerConfigBuilder { get; }
+ ConsumerConfigBuilder? ConsumerConfig { get; }
- StreamsConfigBuilder? StreamsConfigBuilder { get; }
+ ProducerConfigBuilder? ProducerConfig { get; }
- TopicConfigBuilder? TopicConfigBuilder { get; }
+ StreamsConfigBuilder? StreamsConfig { get; }
+
+ TopicConfigBuilder? TopicConfig { get; }
}
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
index 5fdbb9b2..c4da46a3 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
@@ -21,6 +21,7 @@
using Java.Lang;
using Java.Util;
using MASES.KNet.Common;
+using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
using Org.Apache.Kafka.Clients.Consumer;
@@ -47,6 +48,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private int _defaultNumPartitions = 1;
private int? _defaultConsumerInstances = null;
private short _defaultReplicationFactor = 1;
+ private ConsumerConfigBuilder? _consumerConfigBuilder;
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
private TopicConfigBuilder? _topicConfigBuilder;
@@ -71,6 +73,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_defaultNumPartitions = copyFrom._defaultNumPartitions;
_defaultConsumerInstances = copyFrom._defaultConsumerInstances;
_defaultReplicationFactor = copyFrom._defaultReplicationFactor;
+ _consumerConfigBuilder = ConsumerConfigBuilder.CreateFrom(copyFrom._consumerConfigBuilder);
_producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder);
_streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder);
_topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder);
@@ -102,11 +105,13 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
public virtual short DefaultReplicationFactor => _defaultReplicationFactor;
- public virtual ProducerConfigBuilder ProducerConfigBuilder => _producerConfigBuilder!;
+ public virtual ConsumerConfigBuilder ConsumerConfig => _consumerConfigBuilder!;
- public virtual StreamsConfigBuilder StreamsConfigBuilder => _streamsConfigBuilder!;
+ public virtual ProducerConfigBuilder ProducerConfig => _producerConfigBuilder!;
- public virtual TopicConfigBuilder TopicConfigBuilder => _topicConfigBuilder!;
+ public virtual StreamsConfigBuilder StreamsConfig => _streamsConfigBuilder!;
+
+ public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!;
public virtual KafkaOptionsExtension WithUseNameMatching(bool useNameMatching = true)
{
@@ -198,6 +203,15 @@ public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultR
return clone;
}
+ public virtual KafkaOptionsExtension WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
+ {
+ var clone = Clone();
+
+ clone._consumerConfigBuilder = ConsumerConfigBuilder.CreateFrom(consumerConfigBuilder);
+
+ return clone;
+ }
+
public virtual KafkaOptionsExtension WithProducerConfig(ProducerConfigBuilder producerConfigBuilder)
{
var clone = Clone();
@@ -233,31 +247,31 @@ public virtual Properties StreamsOptions(IEntityType entityType)
public virtual Properties StreamsOptions(string applicationId)
{
Properties props = _streamsConfigBuilder ?? new();
- if (props.ContainsKey(StreamsConfig.APPLICATION_ID_CONFIG))
+ if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG))
{
- props.Remove(StreamsConfig.APPLICATION_ID_CONFIG);
+ props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG);
}
- props.Put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- if (props.ContainsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
+ props.Put(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
{
- props.Remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+ props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
}
- props.Put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
- if (props.ContainsKey(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG))
+ props.Put(Org.Apache.Kafka.Streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
+ if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG))
{
- props.Remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
+ props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
}
- props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
- if (props.ContainsKey(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
+ props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
+ if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
{
- props.Remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
+ props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
}
- props.Put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
- if (props.ContainsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+ props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
+ if (props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
{
- props.Remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ props.Remove(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
}
- props.Put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@@ -265,22 +279,22 @@ public virtual Properties StreamsOptions(string applicationId)
public virtual Properties ProducerOptions()
{
Properties props = _producerConfigBuilder ?? new();
- if (props.ContainsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ if (props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
{
- props.Remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ props.Remove(Org.Apache.Kafka.Clients.Producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
- props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
- if (!props.ContainsKey(ProducerConfig.ACKS_CONFIG))
+ props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
+ if (!props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.ACKS_CONFIG))
{
- props.Put(ProducerConfig.ACKS_CONFIG, "all");
+ props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.ACKS_CONFIG, "all");
}
- if (!props.ContainsKey(ProducerConfig.RETRIES_CONFIG))
+ if (!props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.RETRIES_CONFIG))
{
- props.Put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.RETRIES_CONFIG, 0);
}
- if (!props.ContainsKey(ProducerConfig.LINGER_MS_CONFIG))
+ if (!props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.LINGER_MS_CONFIG))
{
- props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
+ props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.LINGER_MS_CONFIG, 1);
}
//if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
//{
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
index ec2ea120..94e555bd 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
@@ -17,6 +17,7 @@
*/
using MASES.KNet.Common;
+using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
@@ -45,9 +46,10 @@ public virtual void Initialize(IDbContextOptions options)
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
DefaultConsumerInstances = kafkaOptions.DefaultConsumerInstances;
DefaultReplicationFactor = kafkaOptions.DefaultReplicationFactor;
- ProducerConfigBuilder = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfigBuilder);
- StreamsConfigBuilder = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfigBuilder);
- TopicConfigBuilder = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfigBuilder);
+ ConsumerConfig = ConsumerConfigBuilder.CreateFrom(kafkaOptions.ConsumerConfig);
+ ProducerConfig = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfig);
+ StreamsConfig = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfig);
+ TopicConfig = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfig);
}
}
@@ -85,9 +87,11 @@ public virtual void Validate(IDbContextOptions options)
public virtual int DefaultReplicationFactor { get; private set; }
- public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; private set; }
+ public virtual ConsumerConfigBuilder? ConsumerConfig { get; private set; }
- public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; private set; }
+ public virtual ProducerConfigBuilder? ProducerConfig { get; private set; }
- public virtual TopicConfigBuilder? TopicConfigBuilder { get; private set; }
+ public virtual StreamsConfigBuilder? StreamsConfig { get; private set; }
+
+ public virtual TopicConfigBuilder? TopicConfig { get; private set; }
}
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
index ef429045..f04136bc 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
@@ -17,6 +17,7 @@
*/
using MASES.KNet.Common;
+using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
@@ -105,6 +106,32 @@ public static bool EnableKEFCoreTracing
}
}
+ ///
+ /// The default configuration
+ ///
+ /// The default configuration.
+ public static ConsumerConfigBuilder DefaultConsumerConfig => ConsumerConfigBuilder.Create().WithEnableAutoCommit(true)
+ .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
+ .WithAllowAutoCreateTopics(false);
+ ///
+ /// The default configuration
+ ///
+ /// The default configuration.
+ public static ProducerConfigBuilder DefaultProducerConfig => ProducerConfigBuilder.Create();
+ ///
+ /// The default configuration
+ ///
+ /// The default configuration.
+ public static StreamsConfigBuilder DefaultStreamsConfig => StreamsConfigBuilder.Create();
+ ///
+ /// The default configuration
+ ///
+ /// The default configuration.
+ public static TopicConfigBuilder DefaultTopicConfig => TopicConfigBuilder.Create().WithDeleteRetentionMs(100)
+ .WithMinCleanableDirtyRatio(0.01)
+ .WithSegmentMs(100)
+ .WithRetentionBytes(1073741824);
+
///
public KafkaDbContext()
{
@@ -153,17 +180,21 @@ public KafkaDbContext(DbContextOptions options) : base(options)
///
public virtual bool UseCompactedReplicator { get; set; } = false;
///
+ /// The optional used when is
+ ///
+ public virtual ConsumerConfigBuilder? ConsumerConfig { get; set; }
+ ///
/// The optional
///
- public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; set; }
+ public virtual ProducerConfigBuilder? ProducerConfig { get; set; }
///
- /// The optional
+ /// The optional used when is
///
- public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; set; }
+ public virtual StreamsConfigBuilder? StreamsConfig { get; set; }
///
- /// The optional
+ /// The optional used when topics shall be created
///
- public virtual TopicConfigBuilder? TopicConfigBuilder { get; set; }
+ public virtual TopicConfigBuilder? TopicConfig { get; set; }
///
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
@@ -172,7 +203,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) =>
{
- o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
+ o.ConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
+ o.ProducerConfig(ProducerConfig ?? DefaultProducerConfig);
+ o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
+ o.TopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
//o.WithProducerByEntity(UseProducerByEntity);
o.WithCompactedReplicator(UseCompactedReplicator);
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
index 78328061..d02ab32f 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
@@ -21,7 +21,9 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.KNet.Common;
+using MASES.KNet.Consumer;
using MASES.KNet.Producer;
+using MASES.KNet.Replicator;
using MASES.KNet.Streams;
using Org.Apache.Kafka.Clients.Producer;
using Org.Apache.Kafka.Streams;
@@ -64,21 +66,6 @@ public KafkaDbContextOptionsBuilder(DbContextOptionsBuilder optionsBuilder)
///
DbContextOptionsBuilder IKafkaDbContextOptionsBuilderInfrastructure.OptionsBuilder => OptionsBuilder;
- ///
- /// The default configuration
- ///
- /// The default configuration.
- public ProducerConfigBuilder EmptyProducerConfigBuilder => ProducerConfigBuilder.Create();
- ///
- /// The default configuration
- ///
- /// The default configuration.
- public StreamsConfigBuilder EmptyStreamsConfigBuilder => StreamsConfigBuilder.Create();
- ///
- /// The default configuration
- ///
- /// The default configuration.
- public TopicConfigBuilder EmptyTopicConfigBuilder => TopicConfigBuilder.Create();
///
/// Enables name matching on instead of matching
@@ -227,6 +214,27 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultReplicationFactor(short d
return this;
}
+ ///
+ /// Set properties of .
+ ///
+ ///
+ /// See Using DbContextOptions, and
+ /// The EF Core Kafka database provider for more information and examples.
+ ///
+ /// The where options are stored.
+ /// The same builder instance so that multiple calls can be chained.
+ public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
+ {
+ var extension = OptionsBuilder.Options.FindExtension()
+ ?? new KafkaOptionsExtension();
+
+ extension = extension.WithConsumerConfig(consumerConfigBuilder);
+
+ ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);
+
+ return this;
+ }
+
///
/// Set properties of .
///
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
index 4dd7e6a6..e3c4b727 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -202,8 +202,9 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
Partitions = _entityType.NumPartitions(_cluster.Options),
ConsumerInstances = _entityType.ConsumerInstances(_cluster.Options),
ReplicationFactor = _entityType.ReplicationFactor(_cluster.Options),
- TopicConfig = _cluster.Options.TopicConfigBuilder,
- ProducerConfig = _cluster.Options.ProducerConfigBuilder,
+ ConsumerConfig = _cluster.Options.ConsumerConfig,
+ TopicConfig = _cluster.Options.TopicConfig,
+ ProducerConfig = _cluster.Options.ProducerConfig,
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
index 8164d8f1..b4402c13 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
@@ -197,9 +197,9 @@ private string CreateTable(IEntityType entityType, int cycle)
try
{
using var topic = new NewTopic(topicName, entityType.NumPartitions(Options), entityType.ReplicationFactor(Options));
- Options.TopicConfigBuilder.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete;
- Options.TopicConfigBuilder.RetentionBytes = 1024 * 1024 * 1024;
- using var map = Options.TopicConfigBuilder.ToMap();
+ Options.TopicConfig.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete;
+ Options.TopicConfig.RetentionBytes = 1024 * 1024 * 1024;
+ using var map = Options.TopicConfig.ToMap();
topic.Configs(map);
using var coll = Collections.Singleton(topic);
using Properties props = new();
diff --git a/test/KEFCore.Benchmark.Test/Program.cs b/test/KEFCore.Benchmark.Test/Program.cs
index f086afb4..094251be 100644
--- a/test/KEFCore.Benchmark.Test/Program.cs
+++ b/test/KEFCore.Benchmark.Test/Program.cs
@@ -86,7 +86,7 @@ static void Main(string[] args)
BootstrapServers = config.BootstrapServers,
ApplicationId = config.ApplicationId,
DbName = databaseName,
- StreamsConfigBuilder = streamConfig,
+ StreamsConfig = streamConfig,
})
{
@@ -141,7 +141,7 @@ static void Main(string[] args)
BootstrapServers = config.BootstrapServers,
ApplicationId = config.ApplicationId,
DbName = databaseName,
- StreamsConfigBuilder = streamConfig,
+ StreamsConfig = streamConfig,
})
{
Stopwatch watch = new Stopwatch();
diff --git a/test/KEFCore.Complex.Test/Program.cs b/test/KEFCore.Complex.Test/Program.cs
index b1ad1fca..46348827 100644
--- a/test/KEFCore.Complex.Test/Program.cs
+++ b/test/KEFCore.Complex.Test/Program.cs
@@ -83,7 +83,7 @@ static void Main(string[] args)
BootstrapServers = config.BootstrapServers,
ApplicationId = config.ApplicationId,
DbName = databaseName,
- StreamsConfigBuilder = streamConfig,
+ StreamsConfig = streamConfig,
};
if (config.DeleteApplicationData)
diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs
index f56ec8d3..f428f66f 100644
--- a/test/KEFCore.Test/Program.cs
+++ b/test/KEFCore.Test/Program.cs
@@ -83,7 +83,7 @@ static void Main(string[] args)
BootstrapServers = config.BootstrapServers,
ApplicationId = config.ApplicationId,
DbName = databaseName,
- StreamsConfigBuilder = streamConfig,
+ StreamsConfig = streamConfig,
};
if (config.DeleteApplicationData)