From ed51e31060df82c38f8eb6ea044f4eb43291c0b3 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 16 Oct 2023 01:33:12 +0200 Subject: [PATCH] Added new UseDeletePolicyForTopic property and disclaimer about stability --- README.md | 2 + src/documentation/articles/currentstate.md | 3 +- src/documentation/articles/gettingstarted.md | 2 + src/documentation/articles/intro.md | 2 + src/documentation/articles/kafkadbcontext.md | 3 +- src/documentation/articles/roadmap.md | 1 + src/documentation/articles/serialization.md | 2 + src/documentation/index.md | 14 ++++++- .../Internal/IKafkaSingletonOptions.cs | 2 +- .../Internal/KafkaOptionsExtension.cs | 18 ++++----- .../Internal/KafkaSingletonOptions.cs | 4 +- .../KEFCore/Infrastructure/KafkaDbContext.cs | 10 ++--- .../KafkaDbContextOptionsBuilder.cs | 40 +++++++++---------- .../KEFCore/Storage/Internal/KafkaCluster.cs | 4 +- 14 files changed, 66 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 9524af5b..245d6555 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kaf [![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml) [![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml) +> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice. + --- ## Scope of the project diff --git a/src/documentation/articles/currentstate.md b/src/documentation/articles/currentstate.md index b1d07977..223eb7b9 100644 --- a/src/documentation/articles/currentstate.md +++ b/src/documentation/articles/currentstate.md @@ -4,4 +4,5 @@ The latest release implementes these features: * [x] A working provider based on Apache Kafka Streams * [x] The provider can use KNetCompactedReplicator -* [x] A first external package for serialization based on previous available serializers \ No newline at end of file +* [x] An external package for serialization based on .NET 6 Json serializers +* [x] An external package for serialization based on Apache Avro serializers \ No newline at end of file diff --git a/src/documentation/articles/gettingstarted.md b/src/documentation/articles/gettingstarted.md index 2731f0e6..ceaf6d28 100644 --- a/src/documentation/articles/gettingstarted.md +++ b/src/documentation/articles/gettingstarted.md @@ -4,6 +4,8 @@ To use [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provi - an installed JRE/JDK (11+) - an accessible Apache Kafka broker (a full cluster or a local Dockerized version) +> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice. + ## First project setup - Create a new simple empty project: diff --git a/src/documentation/articles/intro.md b/src/documentation/articles/intro.md index 51f2f6e5..f3db9007 100644 --- a/src/documentation/articles/intro.md +++ b/src/documentation/articles/intro.md @@ -4,3 +4,5 @@ KEFCore is the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kafka](https://kafka.apache.org/) as a distributed database and more. This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. + +> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice. \ No newline at end of file diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md index 85c54863..29e97390 100644 --- a/src/documentation/articles/kafkadbcontext.md +++ b/src/documentation/articles/kafkadbcontext.md @@ -12,7 +12,8 @@ - **DefaultNumPartitions**: the default number of partitions used when topics are created for each entity - **DefaultReplicationFactor**: the replication factor to use when data are stored in Apache Kafka - **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true** - - **UsePersistentStorage**: set to **true** to use a persintent storage between multiple application startup + - **UsePersistentStorage**: set to **true** to use a persistent storage between multiple application startup + - **UseDeletePolicyForTopic**: set to **true** to enable [delete cleanup policy](https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy) - **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics - **ConsumerConfig**: parameters to use for Producer - **ProducerConfig**: parameters to use for Producer diff --git a/src/documentation/articles/roadmap.md b/src/documentation/articles/roadmap.md index a86a86bd..c79a3072 100644 --- a/src/documentation/articles/roadmap.md +++ b/src/documentation/articles/roadmap.md @@ -6,3 +6,4 @@ The roadmap can be synthetized in the following points: * [ ] Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information * [x] Use KNetCompactedReplicator beside Apache Kafka Streams * [x] Add external package to manage data serialization +* [x] Add Avro external package to manage data serialization \ No newline at end of file diff --git a/src/documentation/articles/serialization.md b/src/documentation/articles/serialization.md index 6dc2fab6..18f3eafc 100644 --- a/src/documentation/articles/serialization.md +++ b/src/documentation/articles/serialization.md @@ -3,6 +3,8 @@ [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities used within the model in something viable from the backend. Each backend has its own schema to convert entities into something else; database providers converts entities into database schema or blob in Cosmos. +> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice. + ## Basic concepts [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities into record will be stored in the topics of Apache Kafka cluster. diff --git a/src/documentation/index.md b/src/documentation/index.md index 10429c3a..7d4d783c 100644 --- a/src/documentation/index.md +++ b/src/documentation/index.md @@ -5,13 +5,25 @@ KEFCore is the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/). Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kafka](https://kafka.apache.org/) as a distributed database and more. -This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. +> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice. ## Scope of the project This project aims to create a provider to access the information stored within an Apache Kafka cluster using the paradigm behind Entity Framework. The project is based on available information within the official [EntityFrameworkCore repository](https://github.com/dotnet/efcore), many classes was copied from there as reported in the official documentation within the Microsoft website at https://docs.microsoft.com/en-us/ef/core/providers/writing-a-provider. +### Community and Contribution + +Do you like the project? +- Request your free [community subscription](https://www.jcobridge.com/pricing-25/). + +Do you want to help us? +- put a :star: on this project +- open [issues](https://github.com/masesgroup/KEFCore/issues) to request features or report bugs :bug: +- improves the project with Pull Requests + +This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. + --- ## Summary diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs index 746e6b5d..20d62ca5 100644 --- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs @@ -46,7 +46,7 @@ public interface IKafkaSingletonOptions : ISingletonOptions string? BootstrapServers { get; } - //bool ProducerByEntity { get; } + bool UseDeletePolicyForTopic { get; } bool UseCompactedReplicator { get; } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index 15a85650..87a591aa 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -45,7 +45,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension private string? _databaseName; private string? _applicationId; private string? _bootstrapServers; - //private bool _producerByEntity = false; + private bool _useDeletePolicyForTopic = false; private bool _useCompactedReplicator = true; private bool _usePersistentStorage = false; private int _defaultNumPartitions = 1; @@ -73,7 +73,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) _databaseName = copyFrom._databaseName; _applicationId = copyFrom._applicationId; _bootstrapServers = copyFrom._bootstrapServers; - //_producerByEntity = copyFrom._producerByEntity; + _useDeletePolicyForTopic = copyFrom._useDeletePolicyForTopic; _useCompactedReplicator = copyFrom._useCompactedReplicator; _usePersistentStorage = copyFrom._usePersistentStorage; _defaultNumPartitions = copyFrom._defaultNumPartitions; @@ -105,7 +105,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) public virtual string BootstrapServers => _bootstrapServers!; - //public virtual bool ProducerByEntity => _producerByEntity; + public virtual bool UseDeletePolicyForTopic => _useDeletePolicyForTopic; public virtual bool UseCompactedReplicator => _useCompactedReplicator; @@ -194,14 +194,14 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer return clone; } - //public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false) - //{ - // var clone = Clone(); + public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false) + { + var clone = Clone(); - // clone._producerByEntity = producerByEntity; + clone._useDeletePolicyForTopic = useDeletePolicyForTopic; - // return clone; - //} + return clone; + } public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = true) { diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs index e73f430c..4b25c1e9 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs @@ -43,7 +43,7 @@ public virtual void Initialize(IDbContextOptions options) DatabaseName = kafkaOptions.DatabaseName; ApplicationId = kafkaOptions.ApplicationId; BootstrapServers = kafkaOptions.BootstrapServers; - //ProducerByEntity = kafkaOptions.ProducerByEntity; + UseDeletePolicyForTopic = kafkaOptions.UseDeletePolicyForTopic; UseCompactedReplicator = kafkaOptions.UseCompactedReplicator; UsePersistentStorage = kafkaOptions.UsePersistentStorage; DefaultNumPartitions = kafkaOptions.DefaultNumPartitions; @@ -84,7 +84,7 @@ public virtual void Validate(IDbContextOptions options) public virtual string? BootstrapServers { get; private set; } - public virtual bool ProducerByEntity { get; private set; } + public virtual bool UseDeletePolicyForTopic { get; private set; } public virtual bool UseCompactedReplicator { get; private set; } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index 19515973..feb78a02 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -195,10 +195,10 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// Use persistent storage when Apache Kafka Streams is in use /// public virtual bool UsePersistentStorage { get; set; } = false; - ///// - ///// Use a producer for each Entity - ///// - //public bool UseProducerByEntity { get; set; } = false; + /// + /// Use delete cleanup policy when a topic is created + /// + public bool UseDeletePolicyForTopic { get; set; } = false; /// /// Use instead of Apache Kafka Streams /// @@ -232,7 +232,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions); o.TopicConfig(TopicConfig ?? DefaultTopicConfig); o.WithUsePersistentStorage(UsePersistentStorage); - //o.WithProducerByEntity(UseProducerByEntity); + o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic); o.WithCompactedReplicator(UseCompactedReplicator); o.WithDefaultReplicationFactor(DefaultReplicationFactor); if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType); diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs index d12d0964..b7115bcd 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs @@ -157,26 +157,26 @@ public virtual KafkaDbContextOptionsBuilder WithUseNameMatching(bool useNameMatc return this; } - ///// - ///// Enables creation of producer for each - ///// - ///// - ///// See Using DbContextOptions, and - ///// The EF Core Kafka database provider for more information and examples. - ///// - ///// If , then each entity will have its own . - ///// The same builder instance so that multiple calls can be chained. - //public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false) - //{ - // var extension = OptionsBuilder.Options.FindExtension() - // ?? new KafkaOptionsExtension(); - - // extension = extension.WithProducerByEntity(producerByEntity); - - // ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); - - // return this; - //} + /// + /// Enables delete cleanup policy when the topic is created the first time + /// + /// + /// See Using DbContextOptions, and + /// The EF Core Kafka database provider for more information and examples. + /// + /// If , then will be used delete cleanup policy when the topic is created the first time. + /// The same builder instance so that multiple calls can be chained. + public virtual KafkaDbContextOptionsBuilder WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false) + { + var extension = OptionsBuilder.Options.FindExtension() + ?? new KafkaOptionsExtension(); + + extension = extension.WithUseDeletePolicyForTopic(useDeletePolicyForTopic); + + ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); + + return this; + } /// /// Enables use of diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs index b4402c13..ff2f9254 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs @@ -197,7 +197,9 @@ private string CreateTable(IEntityType entityType, int cycle) try { using var topic = new NewTopic(topicName, entityType.NumPartitions(Options), entityType.ReplicationFactor(Options)); - Options.TopicConfig.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete; + Options.TopicConfig.CleanupPolicy = Options.UseDeletePolicyForTopic + ? MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete + : MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact; Options.TopicConfig.RetentionBytes = 1024 * 1024 * 1024; using var map = Options.TopicConfig.ToMap(); topic.Configs(map);