From dc63baabf96856cb99e17543d4a5d6d3ebb21977 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Fri, 20 Oct 2023 14:38:11 +0200 Subject: [PATCH] Added change event handler to receive notifications when something change on an IEntityType, it works only if Compacted Replicator is in use --- src/documentation/articles/kafkadbcontext.md | 1 + .../Internal/IKafkaSingletonOptions.cs | 2 ++ .../Internal/KafkaOptionsExtension.cs | 13 +++++++++ .../Internal/KafkaSingletonOptions.cs | 3 ++ .../KEFCore/Infrastructure/KafkaDbContext.cs | 15 +++++++--- .../KafkaDbContextOptionsBuilder.cs | 29 ++++++++++++++++--- .../Storage/Internal/EntityTypeProducer.cs | 24 ++++++++++++++- 7 files changed, 78 insertions(+), 9 deletions(-) diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md index 29e97390..3f5dd8ab 100644 --- a/src/documentation/articles/kafkadbcontext.md +++ b/src/documentation/articles/kafkadbcontext.md @@ -19,6 +19,7 @@ - **ProducerConfig**: parameters to use for Producer - **StreamsConfig**: parameters to use for Apche Kafka Streams application - **TopicConfig**: parameters to use on topic creation for each entity + - **OnChangeEvent**: handler to receive change events from back-end ## How to use `KafkaDbContext` class diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs index 20d62ca5..ab6aecae 100644 --- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs @@ -65,4 +65,6 @@ public interface IKafkaSingletonOptions : ISingletonOptions StreamsConfigBuilder? StreamsConfig { get; } TopicConfigBuilder? TopicConfig { get; } + + Action OnChangeEvent { get; } } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index 09dc56fe..f3dd184c 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -55,6 +55,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension private ProducerConfigBuilder? _producerConfigBuilder; private StreamsConfigBuilder? _streamsConfigBuilder; private TopicConfigBuilder? _topicConfigBuilder; + private Action? _onChangeEvent = null; private DbContextOptionsExtensionInfo? _info; static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader; @@ -83,6 +84,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) _producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder); _streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder); _topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder); + _onChangeEvent = copyFrom._onChangeEvent; } public virtual DbContextOptionsExtensionInfo Info => _info ??= new ExtensionInfo(this); @@ -125,6 +127,8 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!; + public virtual Action OnChangeEvent => _onChangeEvent!; + public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType) { if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\""); @@ -284,6 +288,15 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon return clone; } + public virtual KafkaOptionsExtension WithOnChangeEvent(Action onChangeEvent) + { + var clone = Clone(); + + clone._onChangeEvent = onChangeEvent; + + return clone; + } + public virtual Properties StreamsOptions(IEntityType entityType) { return StreamsOptions(entityType.ApplicationIdForTable(this)); diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs index 4b25c1e9..48cf7439 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs @@ -53,6 +53,7 @@ public virtual void Initialize(IDbContextOptions options) ProducerConfig = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfig); StreamsConfig = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfig); TopicConfig = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfig); + OnChangeEvent = kafkaOptions.OnChangeEvent; } } @@ -103,4 +104,6 @@ public virtual void Validate(IDbContextOptions options) public virtual StreamsConfigBuilder? StreamsConfig { get; private set; } public virtual TopicConfigBuilder? TopicConfig { get; private set; } + + public virtual Action OnChangeEvent { get; private set; } } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index 6bb9a265..18626fb8 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -219,6 +219,12 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// The optional used when topics shall be created /// public virtual TopicConfigBuilder? TopicConfig { get; set; } + /// + /// The optional handler to be used to receive notification when the back-end triggers a data change. + /// + /// Works if is + public virtual Action? OnChangeEvent { get; set; } = null; + /// protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { @@ -227,10 +233,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) => { - o.ConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig); - o.ProducerConfig(ProducerConfig ?? DefaultProducerConfig); - o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions); - o.TopicConfig(TopicConfig ?? DefaultTopicConfig); + o.WithConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig); + o.WithProducerConfig(ProducerConfig ?? DefaultProducerConfig); + o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions); + o.WithTopicConfig(TopicConfig ?? DefaultTopicConfig); o.WithUsePersistentStorage(UsePersistentStorage); o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic); o.WithCompactedReplicator(UseCompactedReplicator); @@ -238,6 +244,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType); if (ValueSerializationType != null) o.WithValueSerializationType(ValueSerializationType); if (ValueContainerType != null) o.WithValueContainerType(ValueContainerType); + if (OnChangeEvent != null) o.WithOnChangeEvent(OnChangeEvent); }); } } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs index b7115bcd..934895b0 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs @@ -292,7 +292,7 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultReplicationFactor(short d /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder) + public virtual KafkaDbContextOptionsBuilder WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -313,7 +313,7 @@ public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder producerConfigBuilder) + public virtual KafkaDbContextOptionsBuilder WithProducerConfig(ProducerConfigBuilder producerConfigBuilder) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -334,7 +334,7 @@ public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder streamsConfigBuilder) + public virtual KafkaDbContextOptionsBuilder WithStreamsConfig(StreamsConfigBuilder streamsConfigBuilder) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -355,7 +355,7 @@ public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder s /// /// The where options are stored. /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topicConfig) + public virtual KafkaDbContextOptionsBuilder WithTopicConfig(TopicConfigBuilder topicConfig) { var extension = OptionsBuilder.Options.FindExtension() ?? new KafkaOptionsExtension(); @@ -367,6 +367,27 @@ public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topic return this; } + /// + /// Set the optional handler to be used to receive notification when the back-end triggers a data change. Works if is invoked with + /// + /// + /// See Using DbContextOptions, and + /// The EF Core Kafka database provider for more information and examples. + /// + /// The will be used to report change event. + /// The same builder instance so that multiple calls can be chained. + public virtual KafkaDbContextOptionsBuilder WithOnChangeEvent(Action onChangeEvent) + { + var extension = OptionsBuilder.Options.FindExtension() + ?? new KafkaOptionsExtension(); + + extension = extension.WithOnChangeEvent(onChangeEvent); + + ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); + + return this; + } + #region Hidden System.Object members /// diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index 7af0426d..f49e03a0 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -50,6 +50,7 @@ public class EntityTypeProducer _keySerdes; private readonly IKNetSerDes _valueSerdes; + private readonly Action? _onChangeEvent; #region KNetCompactedReplicatorEnumerable class KNetCompactedReplicatorEnumerable : IEnumerable @@ -188,6 +189,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) _entityType = entityType; _cluster = cluster; _useCompactedReplicator = _cluster.Options.UseCompactedReplicator; + _onChangeEvent = _cluster.Options.OnChangeEvent; var tTValueContainer = typeof(TValueContainer); TValueContainerConstructor = tTValueContainer.GetConstructors().Single(ci => ci.GetParameters().Length == 2); @@ -211,6 +213,11 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) KeySerDes = _keySerdes, ValueSerDes = _valueSerdes, }; + if (_onChangeEvent != null) + { + _kafkaCompactedReplicator.OnRemoteUpdate += KafkaCompactedReplicator_OnRemoteUpdate; + _kafkaCompactedReplicator.OnRemoteRemove += KafkaCompactedReplicator_OnRemoteRemove; + } #if DEBUG_PERFORMANCE Stopwatch sw = Stopwatch.StartNew(); #endif @@ -258,8 +265,13 @@ public IEnumerable> Commit(IEnumerable reco public void Dispose() { - if (_useCompactedReplicator) + if (_kafkaCompactedReplicator != null) { + if (_onChangeEvent != null) + { + _kafkaCompactedReplicator.OnRemoteUpdate -= KafkaCompactedReplicator_OnRemoteUpdate; + _kafkaCompactedReplicator.OnRemoteRemove -= KafkaCompactedReplicator_OnRemoteRemove; + } _kafkaCompactedReplicator?.Dispose(); } else @@ -278,4 +290,14 @@ public IEnumerable ValueBuffers return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator); } } + + private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator arg1, KeyValuePair arg2) + { + _onChangeEvent?.Invoke(_entityType, false, arg2.Key); + } + + private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator arg1, KeyValuePair arg2) + { + _onChangeEvent?.Invoke(_entityType, true, arg2.Key); + } }