From 9e13e820580867e55492461e5e6dbb0e42fcb676 Mon Sep 17 00:00:00 2001
From: MASES Public Developers Team
<94312179+masesdevelopers@users.noreply.github.com>
Date: Mon, 16 Oct 2023 01:40:37 +0200
Subject: [PATCH] Added new UseDeletePolicyForTopic property and disclaimer
about stability (#104)
---
README.md | 2 +
src/documentation/articles/currentstate.md | 3 +-
src/documentation/articles/gettingstarted.md | 2 +
src/documentation/articles/intro.md | 2 +
src/documentation/articles/kafkadbcontext.md | 3 +-
src/documentation/articles/roadmap.md | 1 +
src/documentation/articles/serialization.md | 2 +
src/documentation/index.md | 14 ++++++-
.../Internal/IKafkaSingletonOptions.cs | 2 +-
.../Internal/KafkaOptionsExtension.cs | 18 ++++-----
.../Internal/KafkaSingletonOptions.cs | 4 +-
.../KEFCore/Infrastructure/KafkaDbContext.cs | 10 ++---
.../KafkaDbContextOptionsBuilder.cs | 40 +++++++++----------
.../KEFCore/Storage/Internal/KafkaCluster.cs | 4 +-
14 files changed, 66 insertions(+), 41 deletions(-)
diff --git a/README.md b/README.md
index 9524af5b..245d6555 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,8 @@ Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kaf
[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml)
[![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml)
+> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
+
---
## Scope of the project
diff --git a/src/documentation/articles/currentstate.md b/src/documentation/articles/currentstate.md
index b1d07977..223eb7b9 100644
--- a/src/documentation/articles/currentstate.md
+++ b/src/documentation/articles/currentstate.md
@@ -4,4 +4,5 @@ The latest release implementes these features:
* [x] A working provider based on Apache Kafka Streams
* [x] The provider can use KNetCompactedReplicator
-* [x] A first external package for serialization based on previous available serializers
\ No newline at end of file
+* [x] An external package for serialization based on .NET 6 Json serializers
+* [x] An external package for serialization based on Apache Avro serializers
\ No newline at end of file
diff --git a/src/documentation/articles/gettingstarted.md b/src/documentation/articles/gettingstarted.md
index 2731f0e6..ceaf6d28 100644
--- a/src/documentation/articles/gettingstarted.md
+++ b/src/documentation/articles/gettingstarted.md
@@ -4,6 +4,8 @@ To use [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provi
- an installed JRE/JDK (11+)
- an accessible Apache Kafka broker (a full cluster or a local Dockerized version)
+> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
+
## First project setup
- Create a new simple empty project:
diff --git a/src/documentation/articles/intro.md b/src/documentation/articles/intro.md
index 51f2f6e5..f3db9007 100644
--- a/src/documentation/articles/intro.md
+++ b/src/documentation/articles/intro.md
@@ -4,3 +4,5 @@ KEFCore is the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core
Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kafka](https://kafka.apache.org/) as a distributed database and more.
This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
+
+> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
\ No newline at end of file
diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md
index 85c54863..29e97390 100644
--- a/src/documentation/articles/kafkadbcontext.md
+++ b/src/documentation/articles/kafkadbcontext.md
@@ -12,7 +12,8 @@
- **DefaultNumPartitions**: the default number of partitions used when topics are created for each entity
- **DefaultReplicationFactor**: the replication factor to use when data are stored in Apache Kafka
- **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true**
- - **UsePersistentStorage**: set to **true** to use a persintent storage between multiple application startup
+ - **UsePersistentStorage**: set to **true** to use a persistent storage between multiple application startup
+ - **UseDeletePolicyForTopic**: set to **true** to enable [delete cleanup policy](https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy)
- **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics
- **ConsumerConfig**: parameters to use for Producer
- **ProducerConfig**: parameters to use for Producer
diff --git a/src/documentation/articles/roadmap.md b/src/documentation/articles/roadmap.md
index a86a86bd..c79a3072 100644
--- a/src/documentation/articles/roadmap.md
+++ b/src/documentation/articles/roadmap.md
@@ -6,3 +6,4 @@ The roadmap can be synthetized in the following points:
* [ ] Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
* [x] Use KNetCompactedReplicator beside Apache Kafka Streams
* [x] Add external package to manage data serialization
+* [x] Add Avro external package to manage data serialization
\ No newline at end of file
diff --git a/src/documentation/articles/serialization.md b/src/documentation/articles/serialization.md
index 6dc2fab6..18f3eafc 100644
--- a/src/documentation/articles/serialization.md
+++ b/src/documentation/articles/serialization.md
@@ -3,6 +3,8 @@
[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities used within the model in something viable from the backend.
Each backend has its own schema to convert entities into something else; database providers converts entities into database schema or blob in Cosmos.
+> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
+
## Basic concepts
[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities into record will be stored in the topics of Apache Kafka cluster.
diff --git a/src/documentation/index.md b/src/documentation/index.md
index 10429c3a..7d4d783c 100644
--- a/src/documentation/index.md
+++ b/src/documentation/index.md
@@ -5,13 +5,25 @@
KEFCore is the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/).
Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kafka](https://kafka.apache.org/) as a distributed database and more.
-This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
+> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
## Scope of the project
This project aims to create a provider to access the information stored within an Apache Kafka cluster using the paradigm behind Entity Framework.
The project is based on available information within the official [EntityFrameworkCore repository](https://github.com/dotnet/efcore), many classes was copied from there as reported in the official documentation within the Microsoft website at https://docs.microsoft.com/en-us/ef/core/providers/writing-a-provider.
+### Community and Contribution
+
+Do you like the project?
+- Request your free [community subscription](https://www.jcobridge.com/pricing-25/).
+
+Do you want to help us?
+- put a :star: on this project
+- open [issues](https://github.com/masesgroup/KEFCore/issues) to request features or report bugs :bug:
+- improves the project with Pull Requests
+
+This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
+
---
## Summary
diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
index 746e6b5d..20d62ca5 100644
--- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
@@ -46,7 +46,7 @@ public interface IKafkaSingletonOptions : ISingletonOptions
string? BootstrapServers { get; }
- //bool ProducerByEntity { get; }
+ bool UseDeletePolicyForTopic { get; }
bool UseCompactedReplicator { get; }
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
index 15a85650..87a591aa 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
@@ -45,7 +45,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private string? _databaseName;
private string? _applicationId;
private string? _bootstrapServers;
- //private bool _producerByEntity = false;
+ private bool _useDeletePolicyForTopic = false;
private bool _useCompactedReplicator = true;
private bool _usePersistentStorage = false;
private int _defaultNumPartitions = 1;
@@ -73,7 +73,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_databaseName = copyFrom._databaseName;
_applicationId = copyFrom._applicationId;
_bootstrapServers = copyFrom._bootstrapServers;
- //_producerByEntity = copyFrom._producerByEntity;
+ _useDeletePolicyForTopic = copyFrom._useDeletePolicyForTopic;
_useCompactedReplicator = copyFrom._useCompactedReplicator;
_usePersistentStorage = copyFrom._usePersistentStorage;
_defaultNumPartitions = copyFrom._defaultNumPartitions;
@@ -105,7 +105,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
public virtual string BootstrapServers => _bootstrapServers!;
- //public virtual bool ProducerByEntity => _producerByEntity;
+ public virtual bool UseDeletePolicyForTopic => _useDeletePolicyForTopic;
public virtual bool UseCompactedReplicator => _useCompactedReplicator;
@@ -194,14 +194,14 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer
return clone;
}
- //public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false)
- //{
- // var clone = Clone();
+ public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false)
+ {
+ var clone = Clone();
- // clone._producerByEntity = producerByEntity;
+ clone._useDeletePolicyForTopic = useDeletePolicyForTopic;
- // return clone;
- //}
+ return clone;
+ }
public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = true)
{
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
index e73f430c..4b25c1e9 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
@@ -43,7 +43,7 @@ public virtual void Initialize(IDbContextOptions options)
DatabaseName = kafkaOptions.DatabaseName;
ApplicationId = kafkaOptions.ApplicationId;
BootstrapServers = kafkaOptions.BootstrapServers;
- //ProducerByEntity = kafkaOptions.ProducerByEntity;
+ UseDeletePolicyForTopic = kafkaOptions.UseDeletePolicyForTopic;
UseCompactedReplicator = kafkaOptions.UseCompactedReplicator;
UsePersistentStorage = kafkaOptions.UsePersistentStorage;
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
@@ -84,7 +84,7 @@ public virtual void Validate(IDbContextOptions options)
public virtual string? BootstrapServers { get; private set; }
- public virtual bool ProducerByEntity { get; private set; }
+ public virtual bool UseDeletePolicyForTopic { get; private set; }
public virtual bool UseCompactedReplicator { get; private set; }
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
index 19515973..feb78a02 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
@@ -195,10 +195,10 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// Use persistent storage when Apache Kafka Streams is in use
///
public virtual bool UsePersistentStorage { get; set; } = false;
- /////
- ///// Use a producer for each Entity
- /////
- //public bool UseProducerByEntity { get; set; } = false;
+ ///
+ /// Use delete cleanup policy when a topic is created
+ ///
+ public bool UseDeletePolicyForTopic { get; set; } = false;
///
/// Use instead of Apache Kafka Streams
///
@@ -232,7 +232,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.TopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
- //o.WithProducerByEntity(UseProducerByEntity);
+ o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType);
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
index d12d0964..b7115bcd 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
@@ -157,26 +157,26 @@ public virtual KafkaDbContextOptionsBuilder WithUseNameMatching(bool useNameMatc
return this;
}
- /////
- ///// Enables creation of producer for each
- /////
- /////
- ///// See Using DbContextOptions, and
- ///// The EF Core Kafka database provider for more information and examples.
- /////
- ///// If , then each entity will have its own .
- ///// The same builder instance so that multiple calls can be chained.
- //public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false)
- //{
- // var extension = OptionsBuilder.Options.FindExtension()
- // ?? new KafkaOptionsExtension();
-
- // extension = extension.WithProducerByEntity(producerByEntity);
-
- // ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);
-
- // return this;
- //}
+ ///
+ /// Enables delete cleanup policy when the topic is created the first time
+ ///
+ ///
+ /// See Using DbContextOptions, and
+ /// The EF Core Kafka database provider for more information and examples.
+ ///
+ /// If , then will be used delete cleanup policy when the topic is created the first time.
+ /// The same builder instance so that multiple calls can be chained.
+ public virtual KafkaDbContextOptionsBuilder WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false)
+ {
+ var extension = OptionsBuilder.Options.FindExtension()
+ ?? new KafkaOptionsExtension();
+
+ extension = extension.WithUseDeletePolicyForTopic(useDeletePolicyForTopic);
+
+ ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);
+
+ return this;
+ }
///
/// Enables use of
diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
index b4402c13..ff2f9254 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
@@ -197,7 +197,9 @@ private string CreateTable(IEntityType entityType, int cycle)
try
{
using var topic = new NewTopic(topicName, entityType.NumPartitions(Options), entityType.ReplicationFactor(Options));
- Options.TopicConfig.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete;
+ Options.TopicConfig.CleanupPolicy = Options.UseDeletePolicyForTopic
+ ? MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete
+ : MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact;
Options.TopicConfig.RetentionBytes = 1024 * 1024 * 1024;
using var map = Options.TopicConfig.ToMap();
topic.Configs(map);