Skip to content

Commit

Permalink
Merge branch 'masesgroup:master' into 118-add-some-use-cases-before-r…
Browse files Browse the repository at this point in the history
…elease-the-first-major-release
  • Loading branch information
masesdevelopers authored Oct 20, 2023
2 parents 7088c2a + f749390 commit f414072
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/documentation/articles/kafkadbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- **ProducerConfig**: parameters to use for Producer
- **StreamsConfig**: parameters to use for Apche Kafka Streams application
- **TopicConfig**: parameters to use on topic creation for each entity
- **OnChangeEvent**: handler to receive change events from back-end

## How to use `KafkaDbContext` class

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ public interface IKafkaSingletonOptions : ISingletonOptions
StreamsConfigBuilder? StreamsConfig { get; }

TopicConfigBuilder? TopicConfig { get; }

Action<IEntityType, bool, object> OnChangeEvent { get; }
}
13 changes: 13 additions & 0 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
private TopicConfigBuilder? _topicConfigBuilder;
private Action<IEntityType, bool, object>? _onChangeEvent = null;
private DbContextOptionsExtensionInfo? _info;

static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder);
_streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder);
_topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder);
_onChangeEvent = copyFrom._onChangeEvent;
}

public virtual DbContextOptionsExtensionInfo Info => _info ??= new ExtensionInfo(this);
Expand Down Expand Up @@ -125,6 +127,8 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)

public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!;

public virtual Action<IEntityType, bool, object> OnChangeEvent => _onChangeEvent!;

public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType)
{
if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\"");
Expand Down Expand Up @@ -284,6 +288,15 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon
return clone;
}

public virtual KafkaOptionsExtension WithOnChangeEvent(Action<IEntityType, bool, object> onChangeEvent)
{
var clone = Clone();

clone._onChangeEvent = onChangeEvent;

return clone;
}

public virtual Properties StreamsOptions(IEntityType entityType)
{
return StreamsOptions(entityType.ApplicationIdForTable(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public virtual void Initialize(IDbContextOptions options)
ProducerConfig = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfig);
StreamsConfig = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfig);
TopicConfig = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfig);
OnChangeEvent = kafkaOptions.OnChangeEvent;
}
}

Expand Down Expand Up @@ -103,4 +104,6 @@ public virtual void Validate(IDbContextOptions options)
public virtual StreamsConfigBuilder? StreamsConfig { get; private set; }

public virtual TopicConfigBuilder? TopicConfig { get; private set; }

public virtual Action<IEntityType, bool, object> OnChangeEvent { get; private set; }
}
15 changes: 11 additions & 4 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// The optional <see cref="TopicConfigBuilder"/> used when topics shall be created
/// </summary>
public virtual TopicConfigBuilder? TopicConfig { get; set; }
/// <summary>
/// The optional handler to be used to receive notification when the back-end triggers a data change.
/// </summary>
/// <remarks>Works if <see cref="UseCompactedReplicator"/> is <see langword="true"/></remarks>
public virtual Action<IEntityType, bool, object>? OnChangeEvent { get; set; } = null;

/// <inheritdoc cref="DbContext.OnConfiguring(DbContextOptionsBuilder)"/>
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
Expand All @@ -227,17 +233,18 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) =>
{
o.ConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
o.ProducerConfig(ProducerConfig ?? DefaultProducerConfig);
o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.TopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
o.WithProducerConfig(ProducerConfig ?? DefaultProducerConfig);
o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithTopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType);
if (ValueSerializationType != null) o.WithValueSerializationType(ValueSerializationType);
if (ValueContainerType != null) o.WithValueContainerType(ValueContainerType);
if (OnChangeEvent != null) o.WithOnChangeEvent(OnChangeEvent);
});
}
}
29 changes: 25 additions & 4 deletions src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultReplicationFactor(short d
/// </remarks>
/// <param name="consumerConfigBuilder">The <see cref="ConsumerConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
public virtual KafkaDbContextOptionsBuilder WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -313,7 +313,7 @@ public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder
/// </remarks>
/// <param name="producerConfigBuilder">The <see cref="ProducerConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder producerConfigBuilder)
public virtual KafkaDbContextOptionsBuilder WithProducerConfig(ProducerConfigBuilder producerConfigBuilder)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -334,7 +334,7 @@ public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder
/// </remarks>
/// <param name="streamsConfigBuilder">The <see cref="StreamsConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder streamsConfigBuilder)
public virtual KafkaDbContextOptionsBuilder WithStreamsConfig(StreamsConfigBuilder streamsConfigBuilder)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -355,7 +355,7 @@ public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder s
/// </remarks>
/// <param name="topicConfig">The <see cref="TopicConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topicConfig)
public virtual KafkaDbContextOptionsBuilder WithTopicConfig(TopicConfigBuilder topicConfig)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -367,6 +367,27 @@ public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topic
return this;
}

/// <summary>
/// Set the optional handler to be used to receive notification when the back-end triggers a data change. Works if <see cref="WithCompactedReplicator(bool)"/> is invoked with <see langword="true"/>
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="onChangeEvent">The <see cref="Action{IEntityType, Boolean, Object}"/> will be used to report change event.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithOnChangeEvent(Action<IEntityType, bool, object> onChangeEvent)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithOnChangeEvent(onChangeEvent);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

#region Hidden System.Object members

/// <summary>
Expand Down
24 changes: 23 additions & 1 deletion src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class EntityTypeProducer<TKey, TValueContainer, TKeySerializer, TValueSer
private readonly IKafkaStreamsBaseRetriever _streamData;
private readonly IKNetSerDes<TKey> _keySerdes;
private readonly IKNetSerDes<TValueContainer> _valueSerdes;
private readonly Action<IEntityType, bool, object>? _onChangeEvent;

#region KNetCompactedReplicatorEnumerable
class KNetCompactedReplicatorEnumerable : IEnumerable<ValueBuffer>
Expand Down Expand Up @@ -188,6 +189,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
_entityType = entityType;
_cluster = cluster;
_useCompactedReplicator = _cluster.Options.UseCompactedReplicator;
_onChangeEvent = _cluster.Options.OnChangeEvent;

var tTValueContainer = typeof(TValueContainer);
TValueContainerConstructor = tTValueContainer.GetConstructors().Single(ci => ci.GetParameters().Length == 2);
Expand All @@ -211,6 +213,11 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
if (_onChangeEvent != null)
{
_kafkaCompactedReplicator.OnRemoteUpdate += KafkaCompactedReplicator_OnRemoteUpdate;
_kafkaCompactedReplicator.OnRemoteRemove += KafkaCompactedReplicator_OnRemoteRemove;
}
#if DEBUG_PERFORMANCE
Stopwatch sw = Stopwatch.StartNew();
#endif
Expand Down Expand Up @@ -258,8 +265,13 @@ public IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<IKafkaRowBag> reco

public void Dispose()
{
if (_useCompactedReplicator)
if (_kafkaCompactedReplicator != null)
{
if (_onChangeEvent != null)
{
_kafkaCompactedReplicator.OnRemoteUpdate -= KafkaCompactedReplicator_OnRemoteUpdate;
_kafkaCompactedReplicator.OnRemoteRemove -= KafkaCompactedReplicator_OnRemoteRemove;
}
_kafkaCompactedReplicator?.Dispose();
}
else
Expand All @@ -278,4 +290,14 @@ public IEnumerable<ValueBuffer> ValueBuffers
return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator);
}
}

private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
_onChangeEvent?.Invoke(_entityType, false, arg2.Key);
}

private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
_onChangeEvent?.Invoke(_entityType, true, arg2.Key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"$schema": "http://json.schemastore.org/template",
"author": "MASES s.r.l.",
"classifications": [ "Common", "Library" ],
"identity": "MASES.EntityFrameworkCore.KNet.Templates",
"identity": "MASES.EntityFrameworkCore.KNet.Templates.Simple",
"name": "Executable template: Simple Console for EntityFrameworkCore provider for Apache Kafka project",
"shortName": "kefcoreApp",
"tags": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"$schema": "http://json.schemastore.org/template",
"author": "MASES s.r.l.",
"classifications": [ "Common", "Library" ],
"identity": "MASES.EntityFrameworkCore.KNet.Templates.AppWithEvents",
"name": "Executable template: Simple Console with events for EntityFrameworkCore provider for Apache Kafka project",
"shortName": "kefcoreAppWithEvents",
"tags": {
"language": "C#",
"type": "project"
}
}
124 changes: 124 additions & 0 deletions src/net/templates/templates/kefcoreAppWithEvents/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;
using System;
using System.Collections.Generic;
using System.Linq;

namespace MASES.EntityFrameworkCore.KNet.Templates
{
partial class Program
{
static void OnEvent(IEntityType entity, bool state, object key)
{
Console.WriteLine($"Entity {entity.Name} has {(state ? "removed" : "added/updated")} the key {key}");
}

static void Main(string[] args)
{
BloggingContext context = null;
try
{
context = new BloggingContext()
{
BootstrapServers = "KAFKA-BROKER:9092",
ApplicationId = "MyApplicationId",
DbName = "MyDB",
OnChangeEvent = OnEvent
};
// cleanup topics on Broker
context.Database.EnsureDeleted();
context.Database.EnsureCreated();

// prefill data
for (int i = 0; i < 1000; i++)
{
context.Add(new Blog
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
{
new Post()
{
Title = "title",
Content = i.ToString()
}
},
Rating = i,
});
}
// save data
context.SaveChanges();

// make some queries
var selector = (from op in context.Blogs
join pg in context.Posts on op.BlogId equals pg.BlogId
where pg.BlogId == op.BlogId
select new { pg, op });
var pageObject = selector.FirstOrDefault();

var post = context.Posts.Single(b => b.BlogId == 2);

post = context.Posts.Single(b => b.BlogId == 1);

var all = context.Posts.All((o) => true);

var value = context.Blogs.AsQueryable().ToQueryString();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
context?.Dispose();
}
}
}

public class BloggingContext : KafkaDbContext
{
// uncomment for persistent storage
// public override bool UsePersistentStorage { get; set; } = true;

// uncomment to disable compacted replicator
//public override bool UseCompactedReplicator { get; set; } = false;

public DbSet<Blog> Blogs { get; set; }
public DbSet<Post> Posts { get; set; }

/// uncomment for model builder
//protected override void OnModelCreating(ModelBuilder modelBuilder)
//{
// modelBuilder.Entity<Blog>().HasKey(c => new { c.BlogId, c.Rating });
//}
}

public class Blog
{
public int BlogId { get; set; }
public string Url { get; set; }
public int Rating { get; set; }
public List<Post> Posts { get; set; }

public override string ToString()
{
return $"BlogId: {BlogId} Url: {Url} Rating: {Rating}";
}
}

public class Post
{
public int PostId { get; set; }
public string Title { get; set; }
public string Content { get; set; }

public int BlogId { get; set; }
public Blog Blog { get; set; }

public override string ToString()
{
return $"PostId: {PostId} Title: {Title} Content: {Content} BlogId: {BlogId}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<LangVersion>latest</LangVersion>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup Condition="Exists('..\..\..\KEFCore\KEFCore.csproj')">
<!--Within GitHub repo: used for test purpose-->
<ProjectReference Include="..\..\..\KEFCore\KEFCore.csproj" />
</ItemGroup>
<ItemGroup Condition="!Exists('..\..\..\KEFCore\KEFCore.csproj')">
<!--Outside GitHub repo-->
<PackageReference Include="MASES.EntityFrameworkCore.KNet" Version="0.10.1" IncludeAssets="All" PrivateAssets="None" />
</ItemGroup>
</Project>
Loading

0 comments on commit f414072

Please sign in to comment.