diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md index 29e97390..3f5dd8ab 100644 --- a/src/documentation/articles/kafkadbcontext.md +++ b/src/documentation/articles/kafkadbcontext.md @@ -19,6 +19,7 @@ - **ProducerConfig**: parameters to use for Producer - **StreamsConfig**: parameters to use for Apche Kafka Streams application - **TopicConfig**: parameters to use on topic creation for each entity + - **OnChangeEvent**: handler to receive change events from back-end ## How to use `KafkaDbContext` class diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs index 20d62ca5..ab6aecae 100644 --- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs @@ -65,4 +65,6 @@ public interface IKafkaSingletonOptions : ISingletonOptions StreamsConfigBuilder? StreamsConfig { get; } TopicConfigBuilder? TopicConfig { get; } + + Action OnChangeEvent { get; } } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index 09dc56fe..f3dd184c 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -55,6 +55,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension private ProducerConfigBuilder? _producerConfigBuilder; private StreamsConfigBuilder? _streamsConfigBuilder; private TopicConfigBuilder? _topicConfigBuilder; + private Action? _onChangeEvent = null; private DbContextOptionsExtensionInfo? _info; static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader; @@ -83,6 +84,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) _producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder); _streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder); _topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder); + _onChangeEvent = copyFrom._onChangeEvent; } public virtual DbContextOptionsExtensionInfo Info => _info ??= new ExtensionInfo(this); @@ -125,6 +127,8 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!; + public virtual Action OnChangeEvent => _onChangeEvent!; + public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType) { if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\""); @@ -284,6 +288,15 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon return clone; } + public virtual KafkaOptionsExtension WithOnChangeEvent(Action onChangeEvent) + { + var clone = Clone(); + + clone._onChangeEvent = onChangeEvent; + + return clone; + } + public virtual Properties StreamsOptions(IEntityType entityType) { return StreamsOptions(entityType.ApplicationIdForTable(this)); diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs index 4b25c1e9..48cf7439 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs @@ -53,6 +53,7 @@ public virtual void Initialize(IDbContextOptions options) ProducerConfig = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfig); StreamsConfig = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfig); TopicConfig = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfig); + OnChangeEvent = kafkaOptions.OnChangeEvent; } } @@ -103,4 +104,6 @@ public virtual void Validate(IDbContextOptions options) public virtual StreamsConfigBuilder? StreamsConfig { get; private set; } public virtual TopicConfigBuilder? TopicConfig { get; private set; } + + public virtual Action OnChangeEvent { get; private set; } } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index 6bb9a265..18626fb8 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -219,6 +219,12 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// The optional used when topics shall be created /// public virtual TopicConfigBuilder? TopicConfig { get; set; } + /// + /// The optional handler to be used to receive notification when the back-end triggers a data change. + /// + /// Works if is + public virtual Action? OnChangeEvent { get; set; } = null; + /// protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { @@ -227,10 +233,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) => { - o.ConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig); - o.ProducerConfig(ProducerConfig ?? DefaultProducerConfig); - o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions); - o.TopicConfig(TopicConfig ?? DefaultTopicConfig); + o.WithConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig); + o.WithProducerConfig(ProducerConfig ?? DefaultProducerConfig); + o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions); + o.WithTopicConfig(TopicConfig ?? DefaultTopicConfig); o.WithUsePersistentStorage(UsePersistentStorage); o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic); o.WithCompactedReplicator(UseCompactedReplicator); @@ -238,6 +244,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType); if (ValueSerializationType != null) o.WithValueSerializationType(ValueSerializationType); if (ValueContainerType != null) o.WithValueContainerType(ValueContainerType); + if (OnChangeEvent != null) o.WithOnChangeEvent(OnChangeEvent); }); } } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs index b7115bcd..934895b0 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs @@ -292,7 +292,7 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultReplicationFactor(short d /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder) + public virtual KafkaDbContextOptionsBuilder WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -313,7 +313,7 @@ public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder producerConfigBuilder) + public virtual KafkaDbContextOptionsBuilder WithProducerConfig(ProducerConfigBuilder producerConfigBuilder) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -334,7 +334,7 @@ public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder streamsConfigBuilder) + public virtual KafkaDbContextOptionsBuilder WithStreamsConfig(StreamsConfigBuilder streamsConfigBuilder) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -355,7 +355,7 @@ public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder s /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topicConfig) + public virtual KafkaDbContextOptionsBuilder WithTopicConfig(TopicConfigBuilder topicConfig) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -367,6 +367,27 @@ public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topic return this; } + /// + /// Set the optional handler to be used to receive notification when the back-end triggers a data change. Works if is invoked with + /// + /// + /// See Using DbContextOptions, and + /// The EF Core Kafka database provider for more information and examples. + /// + /// The will be used to report change event. + /// The same builder instance so that multiple calls can be chained. + public virtual KafkaDbContextOptionsBuilder WithOnChangeEvent(Action onChangeEvent) + { + var extension = OptionsBuilder.Options.FindExtension() + ?? new KafkaOptionsExtension(); + + extension = extension.WithOnChangeEvent(onChangeEvent); + + ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); + + return this; + } + #region Hidden System.Object members /// diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index 7af0426d..f49e03a0 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -50,6 +50,7 @@ public class EntityTypeProducer _keySerdes; private readonly IKNetSerDes _valueSerdes; + private readonly Action? _onChangeEvent; #region KNetCompactedReplicatorEnumerable class KNetCompactedReplicatorEnumerable : IEnumerable @@ -188,6 +189,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) _entityType = entityType; _cluster = cluster; _useCompactedReplicator = _cluster.Options.UseCompactedReplicator; + _onChangeEvent = _cluster.Options.OnChangeEvent; var tTValueContainer = typeof(TValueContainer); TValueContainerConstructor = tTValueContainer.GetConstructors().Single(ci => ci.GetParameters().Length == 2); @@ -211,6 +213,11 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) KeySerDes = _keySerdes, ValueSerDes = _valueSerdes, }; + if (_onChangeEvent != null) + { + _kafkaCompactedReplicator.OnRemoteUpdate += KafkaCompactedReplicator_OnRemoteUpdate; + _kafkaCompactedReplicator.OnRemoteRemove += KafkaCompactedReplicator_OnRemoteRemove; + } #if DEBUG_PERFORMANCE Stopwatch sw = Stopwatch.StartNew(); #endif @@ -258,8 +265,13 @@ public IEnumerable> Commit(IEnumerable reco public void Dispose() { - if (_useCompactedReplicator) + if (_kafkaCompactedReplicator != null) { + if (_onChangeEvent != null) + { + _kafkaCompactedReplicator.OnRemoteUpdate -= KafkaCompactedReplicator_OnRemoteUpdate; + _kafkaCompactedReplicator.OnRemoteRemove -= KafkaCompactedReplicator_OnRemoteRemove; + } _kafkaCompactedReplicator?.Dispose(); } else @@ -278,4 +290,14 @@ public IEnumerable ValueBuffers return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator); } } + + private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator arg1, KeyValuePair arg2) + { + _onChangeEvent?.Invoke(_entityType, false, arg2.Key); + } + + private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator arg1, KeyValuePair arg2) + { + _onChangeEvent?.Invoke(_entityType, true, arg2.Key); + } } diff --git a/src/net/templates/templates/kefcoreApp/.template.config/template.json b/src/net/templates/templates/kefcoreApp/.template.config/template.json index f17cdafa..3be4ac1b 100644 --- a/src/net/templates/templates/kefcoreApp/.template.config/template.json +++ b/src/net/templates/templates/kefcoreApp/.template.config/template.json @@ -2,7 +2,7 @@ "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", "classifications": [ "Common", "Library" ], - "identity": "MASES.EntityFrameworkCore.KNet.Templates", + "identity": "MASES.EntityFrameworkCore.KNet.Templates.Simple", "name": "Executable template: Simple Console for EntityFrameworkCore provider for Apache Kafka project", "shortName": "kefcoreApp", "tags": { diff --git a/src/net/templates/templates/kefcoreApp/knetConnectSink.csproj b/src/net/templates/templates/kefcoreApp/kefcoreApp.csproj similarity index 100% rename from src/net/templates/templates/kefcoreApp/knetConnectSink.csproj rename to src/net/templates/templates/kefcoreApp/kefcoreApp.csproj diff --git a/src/net/templates/templates/kefcoreAppWithEvents/.template.config/template.json b/src/net/templates/templates/kefcoreAppWithEvents/.template.config/template.json new file mode 100644 index 00000000..df017218 --- /dev/null +++ b/src/net/templates/templates/kefcoreAppWithEvents/.template.config/template.json @@ -0,0 +1,12 @@ +{ + "$schema": "http://json.schemastore.org/template", + "author": "MASES s.r.l.", + "classifications": [ "Common", "Library" ], + "identity": "MASES.EntityFrameworkCore.KNet.Templates.AppWithEvents", + "name": "Executable template: Simple Console with events for EntityFrameworkCore provider for Apache Kafka project", + "shortName": "kefcoreAppWithEvents", + "tags": { + "language": "C#", + "type": "project" + } +} \ No newline at end of file diff --git a/src/net/templates/templates/kefcoreAppWithEvents/Program.cs b/src/net/templates/templates/kefcoreAppWithEvents/Program.cs new file mode 100644 index 00000000..28bf2363 --- /dev/null +++ b/src/net/templates/templates/kefcoreAppWithEvents/Program.cs @@ -0,0 +1,124 @@ +using MASES.EntityFrameworkCore.KNet.Infrastructure; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace MASES.EntityFrameworkCore.KNet.Templates +{ + partial class Program + { + static void OnEvent(IEntityType entity, bool state, object key) + { + Console.WriteLine($"Entity {entity.Name} has {(state ? "removed" : "added/updated")} the key {key}"); + } + + static void Main(string[] args) + { + BloggingContext context = null; + try + { + context = new BloggingContext() + { + BootstrapServers = "KAFKA-BROKER:9092", + ApplicationId = "MyApplicationId", + DbName = "MyDB", + OnChangeEvent = OnEvent + }; + // cleanup topics on Broker + context.Database.EnsureDeleted(); + context.Database.EnsureCreated(); + + // prefill data + for (int i = 0; i < 1000; i++) + { + context.Add(new Blog + { + Url = "http://blogs.msdn.com/adonet" + i.ToString(), + Posts = new List() + { + new Post() + { + Title = "title", + Content = i.ToString() + } + }, + Rating = i, + }); + } + // save data + context.SaveChanges(); + + // make some queries + var selector = (from op in context.Blogs + join pg in context.Posts on op.BlogId equals pg.BlogId + where pg.BlogId == op.BlogId + select new { pg, op }); + var pageObject = selector.FirstOrDefault(); + + var post = context.Posts.Single(b => b.BlogId == 2); + + post = context.Posts.Single(b => b.BlogId == 1); + + var all = context.Posts.All((o) => true); + + var value = context.Blogs.AsQueryable().ToQueryString(); + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + } + finally + { + context?.Dispose(); + } + } + } + + public class BloggingContext : KafkaDbContext + { + // uncomment for persistent storage + // public override bool UsePersistentStorage { get; set; } = true; + + // uncomment to disable compacted replicator + //public override bool UseCompactedReplicator { get; set; } = false; + + public DbSet Blogs { get; set; } + public DbSet Posts { get; set; } + + /// uncomment for model builder + //protected override void OnModelCreating(ModelBuilder modelBuilder) + //{ + // modelBuilder.Entity().HasKey(c => new { c.BlogId, c.Rating }); + //} + } + + public class Blog + { + public int BlogId { get; set; } + public string Url { get; set; } + public int Rating { get; set; } + public List Posts { get; set; } + + public override string ToString() + { + return $"BlogId: {BlogId} Url: {Url} Rating: {Rating}"; + } + } + + public class Post + { + public int PostId { get; set; } + public string Title { get; set; } + public string Content { get; set; } + + public int BlogId { get; set; } + public Blog Blog { get; set; } + + public override string ToString() + { + return $"PostId: {PostId} Title: {Title} Content: {Content} BlogId: {BlogId}"; + } + } +} diff --git a/src/net/templates/templates/kefcoreAppWithEvents/kefcoreAppWithEvents.csproj b/src/net/templates/templates/kefcoreAppWithEvents/kefcoreAppWithEvents.csproj new file mode 100644 index 00000000..edcfce30 --- /dev/null +++ b/src/net/templates/templates/kefcoreAppWithEvents/kefcoreAppWithEvents.csproj @@ -0,0 +1,14 @@ + + + latest + net6.0;net7.0 + + + + + + + + + + diff --git a/test/KEFCore.Test.sln b/test/KEFCore.Test.sln index 0c8d960b..f6ced525 100644 --- a/test/KEFCore.Test.sln +++ b/test/KEFCore.Test.sln @@ -27,7 +27,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.Extractor.Test", "K EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Templates", "Templates", "{68A75218-B566-4DF6-8BFF-586B7C3E7ED6}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "knetConnectSink", "..\src\net\templates\templates\kefcoreApp\knetConnectSink.csproj", "{FBB94BFE-4492-4366-914F-410C82F6AEB7}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "kefcoreApp", "..\src\net\templates\templates\kefcoreApp\kefcoreApp.csproj", "{ECB20F1B-A86D-4CB1-953B-B7C42E9FC1CD}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "kefcoreAppWithEvents", "..\src\net\templates\templates\kefcoreAppWithEvents\kefcoreAppWithEvents.csproj", "{EEF483DF-8DDB-474D-8135-2762014F9438}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -71,10 +73,14 @@ Global {E1BE56EB-4701-47E7-876A-D2146619C211}.Debug|Any CPU.Build.0 = Debug|Any CPU {E1BE56EB-4701-47E7-876A-D2146619C211}.Release|Any CPU.ActiveCfg = Release|Any CPU {E1BE56EB-4701-47E7-876A-D2146619C211}.Release|Any CPU.Build.0 = Release|Any CPU - {FBB94BFE-4492-4366-914F-410C82F6AEB7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {FBB94BFE-4492-4366-914F-410C82F6AEB7}.Debug|Any CPU.Build.0 = Debug|Any CPU - {FBB94BFE-4492-4366-914F-410C82F6AEB7}.Release|Any CPU.ActiveCfg = Release|Any CPU - {FBB94BFE-4492-4366-914F-410C82F6AEB7}.Release|Any CPU.Build.0 = Release|Any CPU + {ECB20F1B-A86D-4CB1-953B-B7C42E9FC1CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ECB20F1B-A86D-4CB1-953B-B7C42E9FC1CD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ECB20F1B-A86D-4CB1-953B-B7C42E9FC1CD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ECB20F1B-A86D-4CB1-953B-B7C42E9FC1CD}.Release|Any CPU.Build.0 = Release|Any CPU + {EEF483DF-8DDB-474D-8135-2762014F9438}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EEF483DF-8DDB-474D-8135-2762014F9438}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EEF483DF-8DDB-474D-8135-2762014F9438}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EEF483DF-8DDB-474D-8135-2762014F9438}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -89,7 +95,8 @@ Global {40D1001C-EC50-4994-BE40-F410DB20AF4F} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA} {BED3DF6D-B60E-486D-96E9-AD069D0600D2} = {B35B16BB-890F-4385-AB20-7AA4DD6E9C01} {E1BE56EB-4701-47E7-876A-D2146619C211} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA} - {FBB94BFE-4492-4366-914F-410C82F6AEB7} = {68A75218-B566-4DF6-8BFF-586B7C3E7ED6} + {ECB20F1B-A86D-4CB1-953B-B7C42E9FC1CD} = {68A75218-B566-4DF6-8BFF-586B7C3E7ED6} + {EEF483DF-8DDB-474D-8135-2762014F9438} = {68A75218-B566-4DF6-8BFF-586B7C3E7ED6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {36C294ED-9ECE-42AA-8273-31E008749AF3}