Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#60: added KNetCompactedReplicator as an option #62

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public static int NumPartitions(this IEntityType entityType, KafkaOptionsExtensi
var numPartitions = options.DefaultNumPartitions;
return numPartitions;
}

public static int? ConsumerInstances(this IEntityType entityType, KafkaOptionsExtension options)
{
var consumerInstances = options.DefaultConsumerInstances;
return consumerInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ public interface IKafkaSingletonOptions : ISingletonOptions

bool ProducerByEntity { get; }

bool UseCompactedReplicator { get; }

bool UsePersistentStorage { get; }

int DefaultNumPartitions { get; }

int? DefaultConsumerInstances { get; }

int DefaultReplicationFactor { get; }

ProducerConfigBuilder? ProducerConfigBuilder { get; }
Expand Down
28 changes: 26 additions & 2 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

using Java.Lang;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Common;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Clients.Producer;
using Org.Apache.Kafka.Streams;
using System.Globalization;
using System.Text;

namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;

Expand All @@ -39,8 +37,10 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private string? _applicationId;
private string? _bootstrapServers;
private bool _producerByEntity = false;
private bool _useCompactedReplicator = false;
private bool _usePersistentStorage = false;
private int _defaultNumPartitions = 1;
private int? _defaultConsumerInstances = null;
private short _defaultReplicationFactor = 1;
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
Expand All @@ -61,8 +61,10 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_applicationId = copyFrom._applicationId;
_bootstrapServers = copyFrom._bootstrapServers;
_producerByEntity = copyFrom._producerByEntity;
_useCompactedReplicator = copyFrom._useCompactedReplicator;
_usePersistentStorage = copyFrom._usePersistentStorage;
_defaultNumPartitions = copyFrom._defaultNumPartitions;
_defaultConsumerInstances = copyFrom._defaultConsumerInstances;
_defaultReplicationFactor = copyFrom._defaultReplicationFactor;
_producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder);
_streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder);
Expand All @@ -85,10 +87,14 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)

public virtual bool ProducerByEntity => _producerByEntity;

public virtual bool UseCompactedReplicator => _useCompactedReplicator;

public virtual bool UsePersistentStorage => _usePersistentStorage;

public virtual int DefaultNumPartitions => _defaultNumPartitions;

public virtual int? DefaultConsumerInstances => _defaultConsumerInstances;

public virtual short DefaultReplicationFactor => _defaultReplicationFactor;

public virtual ProducerConfigBuilder ProducerConfigBuilder => _producerConfigBuilder!;
Expand Down Expand Up @@ -142,6 +148,15 @@ public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity
return clone;
}

public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = false)
{
var clone = Clone();

clone._useCompactedReplicator = useCompactedReplicator;

return clone;
}

public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistentStorage = false)
{
var clone = Clone();
Expand All @@ -160,6 +175,15 @@ public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPart
return clone;
}

public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultConsumerInstances = null)
{
var clone = Clone();

clone._defaultConsumerInstances = defaultConsumerInstances;

return clone;
}

public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultReplicationFactor = 1)
{
var clone = Clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public virtual void Initialize(IDbContextOptions options)
ApplicationId = kafkaOptions.ApplicationId;
BootstrapServers = kafkaOptions.BootstrapServers;
ProducerByEntity = kafkaOptions.ProducerByEntity;
UseCompactedReplicator = kafkaOptions.UseCompactedReplicator;
UsePersistentStorage = kafkaOptions.UsePersistentStorage;
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
DefaultConsumerInstances = kafkaOptions.DefaultConsumerInstances;
DefaultReplicationFactor = kafkaOptions.DefaultReplicationFactor;
ProducerConfigBuilder = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfigBuilder);
StreamsConfigBuilder = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfigBuilder);
Expand Down Expand Up @@ -68,10 +70,14 @@ public virtual void Validate(IDbContextOptions options)

public virtual bool ProducerByEntity { get; private set; }

public virtual bool UseCompactedReplicator { get; private set; }

public virtual bool UsePersistentStorage { get; private set; }

public virtual int DefaultNumPartitions { get; private set; }

public virtual int? DefaultConsumerInstances { get; private set; }

public virtual int DefaultReplicationFactor { get; private set; }

public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; private set; }
Expand Down
55 changes: 41 additions & 14 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Refer to LICENSE for more information.
*/

using MASES.KNet;
using MASES.KNet.Common;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
Expand All @@ -30,41 +31,66 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure;
/// </summary>
public class KafkaDbContext : DbContext
{
/// <inheritdoc cref="DbContext.DbContext()"/>
public KafkaDbContext()
{

}
/// <inheritdoc cref="DbContext.DbContext(DbContextOptions)"/>
public KafkaDbContext(DbContextOptions options) : base(options)
{

}

/// <summary>
/// The bootstrap servers of the Apache Kafka cluster
/// </summary>
public string? BootstrapServers { get; set; }
public virtual string? BootstrapServers { get; set; }
/// <summary>
/// The application id
/// </summary>
public string ApplicationId { get; set; } = Guid.NewGuid().ToString();
public virtual string ApplicationId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// Database name
/// </summary>
public string? DbName { get; set; }
public virtual string? DbName { get; set; }
/// <summary>
/// Database number of partitions
/// </summary>
public int DefaultNumPartitions { get; set; } = 10;
public virtual int DefaultNumPartitions { get; set; } = 10;
/// <summary>
/// Database replication factor
/// </summary>
public short DefaultReplicationFactor { get; set; } = 1;
public virtual short DefaultReplicationFactor { get; set; } = 1;
/// <summary>
/// Database consumr instances used in conjunction with <see cref="UseCompactedReplicator"/>
/// </summary>
public virtual int? DefaultConsumerInstances { get; set; } = null;
/// <summary>
/// Use persistent storage
/// </summary>
public bool UsePersistentStorage { get; set; } = false;
public virtual bool UsePersistentStorage { get; set; } = false;
/// <summary>
/// Use a producer for each Entity
/// </summary>
public bool UseProducerByEntity { get; set; } = false;

public ProducerConfigBuilder? ProducerConfigBuilder { get; set; }

public StreamsConfigBuilder? StreamsConfigBuilder { get; set; }

public TopicConfigBuilder? TopicConfigBuilder { get; set; }

/// <summary>
/// Use <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/> instead of Apache Kafka Streams
/// </summary>
public virtual bool UseCompactedReplicator { get; set; } = false;
/// <summary>
/// The optional <see cref="ProducerConfigBuilder"/>
/// </summary>
public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; set; }
/// <summary>
/// The optional <see cref="StreamsConfigBuilder"/>
/// </summary>
public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; set; }
/// <summary>
/// The optional <see cref="TopicConfigBuilder"/>
/// </summary>
public virtual TopicConfigBuilder? TopicConfigBuilder { get; set; }
/// <inheritdoc cref="DbContext.OnConfiguring(DbContextOptionsBuilder)"/>
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
if (BootstrapServers == null)
Expand All @@ -76,9 +102,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) =>
{
o.StreamsConfig(StreamsConfigBuilder??o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithUsePersistentStorage(UsePersistentStorage);
o.WithProducerByEntity(UseProducerByEntity);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
});
}
Expand Down
42 changes: 42 additions & 0 deletions src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerBy
return this;
}

/// <summary>
/// Enables use of <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/>
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="useCompactedReplicator">If <see langword="true" /> then <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/> will be used instead of Apache Kafka Streams.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithCompactedReplicator(bool useCompactedReplicator = false)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithCompactedReplicator(useCompactedReplicator);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

/// <summary>
/// Enables use of persistent storage, otherwise a <see cref="Materialized"/> storage will be in-memory
/// </summary>
Expand Down Expand Up @@ -156,6 +177,27 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultNumPartitions(int default
return this;
}

/// <summary>
/// Defines the default number of consumer instances to be used in conjunction with <see cref="WithCompactedReplicator(bool)"/>
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="defaultConsumerInstances">The default number of consumer instances to be used in conjunction with <see cref="WithCompactedReplicator(bool)"/></param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithDefaultConsumerInstances(int? defaultConsumerInstances = null)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithDefaultConsumerInstances(defaultConsumerInstances);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

/// <summary>
/// Defines the default replication factor to use when a new topic is created
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/KEFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageReadmeFile>README.md</PackageReadmeFile>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<RunAnalyzersDuringLiveAnalysis>False</RunAnalyzersDuringLiveAnalysis>
<RunAnalyzersDuringLiveAnalysis>True</RunAnalyzersDuringLiveAnalysis>
<RunAnalyzersDuringBuild>False</RunAnalyzersDuringBuild>
</PropertyGroup>

Expand Down Expand Up @@ -66,10 +66,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.0.2">
<PackageReference Include="MASES.KNet" Version="2.1.0">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.9" PrivateAssets="none" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.11" PrivateAssets="none" />
</ItemGroup>
</Project>
8 changes: 8 additions & 0 deletions src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal
{
public interface IKafkaSerdesEntityType
{
string Serialize(params object?[]? args);

string Serialize(Headers headers, params object?[]? args);

string Serialize<TKey>(TKey key);

string Serialize<TKey>(Headers headers, TKey key);

object[] Deserialize(string arg);

object[] Deserialize(Headers headers, string arg);

TKey Deserialize<TKey>(string arg);

TKey Deserialize<TKey>(Headers headers, string arg);

object[] ConvertData(object[]? input);
Expand Down
12 changes: 12 additions & 0 deletions src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
[JsonSerializable(typeof(KafkaSerdesEntityTypeData))]
public class KafkaSerdesEntityTypeData
{
public KafkaSerdesEntityTypeData() { }

Check warning on line 29 in src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field 'typeName' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 29 in src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field 'data' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

public KafkaSerdesEntityTypeData(string tName, object[] rData)
{
Expand All @@ -50,23 +50,35 @@
_properties = _type.GetProperties().ToArray();
}

public object[] Deserialize(string arg)
{
var des = GetFullType(arg);
return ConvertData(des!.data);
}

public object[] Deserialize(Headers headers, string arg)
{
var des = GetFullType(arg);
return ConvertData(des!.data);
}

public TKey Deserialize<TKey>(string arg) => System.Text.Json.JsonSerializer.Deserialize<TKey>(arg)!;

public TKey Deserialize<TKey>(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize<TKey>(arg)!;

public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!));

public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!));

public string Serialize<TKey>(TKey key) => System.Text.Json.JsonSerializer.Serialize(key);

public string Serialize<TKey>(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key);

public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize<KafkaSerdesEntityTypeData>(arg);

public object[] ConvertData(object[]? input)
{
if (input == null) return null;

Check warning on line 81 in src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.
List<object> data = new List<object>();

for (int i = 0; i < input!.Length; i++)
Expand Down
6 changes: 5 additions & 1 deletion src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
using MASES.KNet;
using MASES.KNet.Producer;
using MASES.KNet.Replicator;
using Org.Apache.Kafka.Clients.Producer;

namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

public interface IKafkaCluster
public interface IKafkaCluster :IDisposable
{
bool EnsureDeleted(
IUpdateAdapterFactory updateAdapterFactory,
Expand All @@ -46,6 +48,8 @@ bool EnsureConnected(

IKafkaSerdesEntityType CreateSerdes(IEntityType entityType);

IKNetCompactedReplicator<string, string> CreateCompactedReplicator(IEntityType entityType);

IProducer<string, string> CreateProducer(IEntityType entityType);

IEnumerable<ValueBuffer> GetData(IEntityType entityType);
Expand Down
2 changes: 1 addition & 1 deletion src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

public interface IKafkaDatabase : IDatabase
public interface IKafkaDatabase : IDatabase, IDisposable
{
IKafkaCluster Cluster { get; }

Expand Down
Loading