Skip to content

Commit

Permalink
Added new UseDeletePolicyForTopic property and disclaimer about stabi…
Browse files Browse the repository at this point in the history
…lity (#104)
  • Loading branch information
masesdevelopers authored Oct 15, 2023
1 parent 08250f4 commit 9e13e82
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 41 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kaf
[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml)
[![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml)

> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
---

## Scope of the project
Expand Down
3 changes: 2 additions & 1 deletion src/documentation/articles/currentstate.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ The latest release implementes these features:

* [x] A working provider based on Apache Kafka Streams
* [x] The provider can use KNetCompactedReplicator
* [x] A first external package for serialization based on previous available serializers
* [x] An external package for serialization based on .NET 6 Json serializers
* [x] An external package for serialization based on Apache Avro serializers
2 changes: 2 additions & 0 deletions src/documentation/articles/gettingstarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ To use [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provi
- an installed JRE/JDK (11+)
- an accessible Apache Kafka broker (a full cluster or a local Dockerized version)

> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
## First project setup

- Create a new simple empty project:
Expand Down
2 changes: 2 additions & 0 deletions src/documentation/articles/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ KEFCore is the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core
Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kafka](https://kafka.apache.org/) as a distributed database and more.

This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
3 changes: 2 additions & 1 deletion src/documentation/articles/kafkadbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
- **DefaultNumPartitions**: the default number of partitions used when topics are created for each entity
- **DefaultReplicationFactor**: the replication factor to use when data are stored in Apache Kafka
- **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true**
- **UsePersistentStorage**: set to **true** to use a persintent storage between multiple application startup
- **UsePersistentStorage**: set to **true** to use a persistent storage between multiple application startup
- **UseDeletePolicyForTopic**: set to **true** to enable [delete cleanup policy](https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy)
- **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics
- **ConsumerConfig**: parameters to use for Producer
- **ProducerConfig**: parameters to use for Producer
Expand Down
1 change: 1 addition & 0 deletions src/documentation/articles/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ The roadmap can be synthetized in the following points:
* [ ] Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
* [x] Use KNetCompactedReplicator beside Apache Kafka Streams
* [x] Add external package to manage data serialization
* [x] Add Avro external package to manage data serialization
2 changes: 2 additions & 0 deletions src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities used within the model in something viable from the backend.
Each backend has its own schema to convert entities into something else; database providers converts entities into database schema or blob in Cosmos.

> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
## Basic concepts

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities into record will be stored in the topics of Apache Kafka cluster.
Expand Down
14 changes: 13 additions & 1 deletion src/documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@
KEFCore is the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/).
Based on [KNet](https://github.com/masesgroup/KNet) it allows to use [Apache Kafka](https://kafka.apache.org/) as a distributed database and more.

This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].
> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
## Scope of the project

This project aims to create a provider to access the information stored within an Apache Kafka cluster using the paradigm behind Entity Framework.
The project is based on available information within the official [EntityFrameworkCore repository](https://github.com/dotnet/efcore), many classes was copied from there as reported in the official documentation within the Microsoft website at https://docs.microsoft.com/en-us/ef/core/providers/writing-a-provider.

### Community and Contribution

Do you like the project?
- Request your free [community subscription](https://www.jcobridge.com/pricing-25/).

Do you want to help us?
- put a :star: on this project
- open [issues](https://github.com/masesgroup/KEFCore/issues) to request features or report bugs :bug:
- improves the project with Pull Requests

This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].

---
## Summary

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface IKafkaSingletonOptions : ISingletonOptions

string? BootstrapServers { get; }

//bool ProducerByEntity { get; }
bool UseDeletePolicyForTopic { get; }

bool UseCompactedReplicator { get; }

Expand Down
18 changes: 9 additions & 9 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private string? _databaseName;
private string? _applicationId;
private string? _bootstrapServers;
//private bool _producerByEntity = false;
private bool _useDeletePolicyForTopic = false;
private bool _useCompactedReplicator = true;
private bool _usePersistentStorage = false;
private int _defaultNumPartitions = 1;
Expand Down Expand Up @@ -73,7 +73,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_databaseName = copyFrom._databaseName;
_applicationId = copyFrom._applicationId;
_bootstrapServers = copyFrom._bootstrapServers;
//_producerByEntity = copyFrom._producerByEntity;
_useDeletePolicyForTopic = copyFrom._useDeletePolicyForTopic;
_useCompactedReplicator = copyFrom._useCompactedReplicator;
_usePersistentStorage = copyFrom._usePersistentStorage;
_defaultNumPartitions = copyFrom._defaultNumPartitions;
Expand Down Expand Up @@ -105,7 +105,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)

public virtual string BootstrapServers => _bootstrapServers!;

//public virtual bool ProducerByEntity => _producerByEntity;
public virtual bool UseDeletePolicyForTopic => _useDeletePolicyForTopic;

public virtual bool UseCompactedReplicator => _useCompactedReplicator;

Expand Down Expand Up @@ -194,14 +194,14 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer
return clone;
}

//public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false)
//{
// var clone = Clone();
public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false)
{
var clone = Clone();

// clone._producerByEntity = producerByEntity;
clone._useDeletePolicyForTopic = useDeletePolicyForTopic;

// return clone;
//}
return clone;
}

public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public virtual void Initialize(IDbContextOptions options)
DatabaseName = kafkaOptions.DatabaseName;
ApplicationId = kafkaOptions.ApplicationId;
BootstrapServers = kafkaOptions.BootstrapServers;
//ProducerByEntity = kafkaOptions.ProducerByEntity;
UseDeletePolicyForTopic = kafkaOptions.UseDeletePolicyForTopic;
UseCompactedReplicator = kafkaOptions.UseCompactedReplicator;
UsePersistentStorage = kafkaOptions.UsePersistentStorage;
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
Expand Down Expand Up @@ -84,7 +84,7 @@ public virtual void Validate(IDbContextOptions options)

public virtual string? BootstrapServers { get; private set; }

public virtual bool ProducerByEntity { get; private set; }
public virtual bool UseDeletePolicyForTopic { get; private set; }

public virtual bool UseCompactedReplicator { get; private set; }

Expand Down
10 changes: 5 additions & 5 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// Use persistent storage when Apache Kafka Streams is in use
/// </summary>
public virtual bool UsePersistentStorage { get; set; } = false;
///// <summary>
///// Use a producer for each Entity
///// </summary>
//public bool UseProducerByEntity { get; set; } = false;
/// <summary>
/// Use <see href="https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy">delete cleanup policy</see> when a topic is created
/// </summary>
public bool UseDeletePolicyForTopic { get; set; } = false;
/// <summary>
/// Use <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/> instead of Apache Kafka Streams
/// </summary>
Expand Down Expand Up @@ -232,7 +232,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.TopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
//o.WithProducerByEntity(UseProducerByEntity);
o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType);
Expand Down
40 changes: 20 additions & 20 deletions src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,26 +157,26 @@ public virtual KafkaDbContextOptionsBuilder WithUseNameMatching(bool useNameMatc
return this;
}

///// <summary>
///// Enables creation of producer for each <see cref="IEntity"/>
///// </summary>
///// <remarks>
///// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
///// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
///// </remarks>
///// <param name="producerByEntity">If <see langword="true" />, then each entity will have its own <see cref="KafkaProducer"/>.</param>
///// <returns>The same builder instance so that multiple calls can be chained.</returns>
//public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false)
//{
// var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
// ?? new KafkaOptionsExtension();

// extension = extension.WithProducerByEntity(producerByEntity);

// ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

// return this;
//}
/// <summary>
/// Enables <see href="https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy">delete cleanup policy</see> when the topic is created the first time
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="useDeletePolicyForTopic">If <see langword="true" />, then will be used <see href="https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy">delete cleanup policy</see> when the topic is created the first time.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithUseDeletePolicyForTopic(useDeletePolicyForTopic);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

/// <summary>
/// Enables use of <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/>
Expand Down
4 changes: 3 additions & 1 deletion src/net/KEFCore/Storage/Internal/KafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ private string CreateTable(IEntityType entityType, int cycle)
try
{
using var topic = new NewTopic(topicName, entityType.NumPartitions(Options), entityType.ReplicationFactor(Options));
Options.TopicConfig.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete;
Options.TopicConfig.CleanupPolicy = Options.UseDeletePolicyForTopic
? MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete
: MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact;
Options.TopicConfig.RetentionBytes = 1024 * 1024 * 1024;
using var map = Options.TopicConfig.ToMap();
topic.Configs(map);
Expand Down

0 comments on commit 9e13e82

Please sign in to comment.