Skip to content

Commit

Permalink
Update event handler and documentation (#131)
Browse files Browse the repository at this point in the history
* Added protection and update templates

* Reviewed the way the event is reported

* Added some images in documentation
  • Loading branch information
masesdevelopers authored Oct 21, 2023
1 parent 4f562d7 commit 7478325
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 26 deletions.
28 changes: 19 additions & 9 deletions src/documentation/articles/howitworks.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# KEFCore: how it works

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) can be used in some operative conditions.
[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) can be used in some [operative conditions](usecases.md).

However it is important to start with a simple description on how it works.
It is important to start with a simple description on how it works.
In the following chapters sometime it is used the term back-end and sometime Apache Kafka cluster: they shall be considered the same thing int the [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) context.

## Basic concepts

Expand Down Expand Up @@ -42,7 +43,8 @@ All CRUD operations are helped, behind the scene, from [`KNetCompactedReplicator

### First-level cache

[`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) or [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/) act as first-level cache of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/): **data coming from the Apache Kafka cluster updates their content while the system is running without a specific request**.
[`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) or [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/) act as first-level cache of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/): **data coming from the Apache Kafka cluster updates their content while the system is running**.
The behavior is intrinsic and does not need any extra call to the back-end.

### Data storage

Expand All @@ -52,16 +54,17 @@ The conversion is done using serializers that converts the Entities (data in the
## [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) compared to other providers

In the previous chapter was described how [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) permits to reproduce the CRUD operations.
Starting from the model defined in the code, the data will be stored in the topics and each topic can be seen as a table of a database filled in with the same data.
Starting from the model defined in the code, the data are stored in the topics and each topic can be seen as a table of a database filled in with the same data.
From the point of view of an application, the use of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) is similar to the use of the InMemory provider.

### A note on [migrations](https://learn.microsoft.com/en-us/ef/core/managing-schemas/migrations)

The current version of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) does not support [migrations](https://learn.microsoft.com/en-us/ef/core/managing-schemas/migrations).
The current version of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) does not support [migrations](https://learn.microsoft.com/en-us/ef/core/managing-schemas/migrations) explicitly.

## [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) features not available in other providers

Here a list of features [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) gives to its user and useful in some use cases.
The features below are strictly correlated with the consumers receiving back the record from Apache Kafka cluster described above.

### Distributed cache

Expand All @@ -74,18 +77,25 @@ This implies that, virtually, there is a distributed cache between the applicati

If an application restarts it will be able to retrieve latest data (latest cache) and aligns to the shared state.

![Alt text](../images/cache.gif "Distributed cache")

### Events

Generally, an application based on [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/), executes queries to the back-end to store, or retrieve, information on demand.
The alignment (record consumed) can be considered a change event: so any change in the backend produces an event used in different mode.
These change events are used from [`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) and/or [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/) to align the local state.
Moreover [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) can inform, using callbacks and at zero cost, the registered application about these events.
Then the application can use the reported events to execute some actions:
The alignment (record consumed) can be considered a change event: so any change in the backend produces an event used in different mode:
- Mainly these change events are used from [`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) and/or [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/) to align the local state;
- Moreover [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) can inform, using callbacks and at zero cost, the registered application about these events.

Then the application can use the reported events in many modes:
- execute a query
- write something to disk
- execute a REST call
- and so on

![Alt text](../images/events.gif "Distributed cache")

> **IMPORTANT NOTE**: the events are raised from external threads and this can lead to [concurrent exceptions](https://learn.microsoft.com/en-us/ef/core/dbcontext-configuration/#avoiding-dbcontext-threading-issues) if the `KafkaDbContext` is used to retrieve information.
### Applications not based on [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/)

Till now was spoken about applications based on [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/), however this provider can be used to feed applications not based on [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/).
Expand Down
2 changes: 2 additions & 0 deletions src/documentation/articles/usecases.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Sharing it between multiple applications and allocating the `CachingContext` in
Continuing from the previous use case, using the events reported from [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) it is possible to write a reactive application.
When a change event is triggered the application can react to it and take an action.

![Alt text](../images/triggeredcache.gif "Triggered distributed cache")

### SignalR

The triggered distributed cache can be used side-by-side with [SignalR](https://learn.microsoft.com/it-it/aspnet/signalr/overview/getting-started/introduction-to-signalr): combining [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) and [SignalR](https://learn.microsoft.com/it-it/aspnet/signalr/overview/getting-started/introduction-to-signalr) in an application, subscribing to the change events, it is possible to feed the connected applications to [SignalR](https://learn.microsoft.com/it-it/aspnet/signalr/overview/getting-started/introduction-to-signalr).
Expand Down
Binary file added src/documentation/images/cache.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added src/documentation/images/events.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added src/documentation/images/triggeredcache.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#nullable enable

using MASES.EntityFrameworkCore.KNet.Storage;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
Expand Down Expand Up @@ -67,5 +68,5 @@ public interface IKafkaSingletonOptions : ISingletonOptions
/// <inheritdoc cref="KafkaDbContext.TopicConfig"/>
TopicConfigBuilder? TopicConfig { get; }
/// <inheritdoc cref="KafkaDbContext.OnChangeEvent"/>
Action<IEntityType, bool, object>? OnChangeEvent { get; }
Action<EntityTypeChanged>? OnChangeEvent { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Java.Util;
using MASES.EntityFrameworkCore.KNet.Serialization.Json;
using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage;
using MASES.EntityFrameworkCore.KNet.Storage;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
private TopicConfigBuilder? _topicConfigBuilder;
private Action<IEntityType, bool, object>? _onChangeEvent = null;
private Action<EntityTypeChanged>? _onChangeEvent = null;
private DbContextOptionsExtensionInfo? _info;

static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader;
Expand Down Expand Up @@ -132,7 +133,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
/// <inheritdoc cref="KafkaDbContext.TopicConfig"/>
public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!;
/// <inheritdoc cref="KafkaDbContext.OnChangeEvent"/>
public virtual Action<IEntityType, bool, object> OnChangeEvent => _onChangeEvent!;
public virtual Action<EntityTypeChanged> OnChangeEvent => _onChangeEvent!;
/// <inheritdoc cref="KafkaDbContext.KeySerializationType"/>
public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType)
{
Expand Down Expand Up @@ -293,7 +294,7 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon
return clone;
}
/// <inheritdoc cref="KafkaDbContext.OnChangeEvent"/>
public virtual KafkaOptionsExtension WithOnChangeEvent(Action<IEntityType, bool, object> onChangeEvent)
public virtual KafkaOptionsExtension WithOnChangeEvent(Action<EntityTypeChanged> onChangeEvent)
{
var clone = Clone();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Refer to LICENSE for more information.
*/

using MASES.EntityFrameworkCore.KNet.Storage;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
Expand Down Expand Up @@ -106,5 +107,5 @@ public virtual void Validate(IDbContextOptions options)
/// <inheritdoc/>
public virtual TopicConfigBuilder? TopicConfig { get; private set; }
/// <inheritdoc/>
public virtual Action<IEntityType, bool, object>? OnChangeEvent { get; private set; }
public virtual Action<EntityTypeChanged>? OnChangeEvent { get; private set; }
}
3 changes: 2 additions & 1 deletion src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using MASES.EntityFrameworkCore.KNet.Serialization;
using MASES.EntityFrameworkCore.KNet.Serialization.Json;
using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage;
using MASES.EntityFrameworkCore.KNet.Storage;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
Expand Down Expand Up @@ -227,7 +228,7 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// The optional handler to be used to receive notification when the back-end triggers a data change.
/// </summary>
/// <remarks>Works if <see cref="UseCompactedReplicator"/> is <see langword="true"/></remarks>
public virtual Action<IEntityType, bool, object>? OnChangeEvent { get; set; } = null;
public virtual Action<EntityTypeChanged>? OnChangeEvent { get; set; } = null;

/// <inheritdoc cref="DbContext.OnConfiguring(DbContextOptionsBuilder)"/>
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.EntityFrameworkCore.KNet.Storage;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
Expand Down Expand Up @@ -374,7 +375,7 @@ public virtual KafkaDbContextOptionsBuilder WithTopicConfig(TopicConfigBuilder t
/// </remarks>
/// <param name="onChangeEvent">The <see cref="Action{IEntityType, Boolean, Object}"/> will be used to report change event.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithOnChangeEvent(Action<IEntityType, bool, object> onChangeEvent)
public virtual KafkaDbContextOptionsBuilder WithOnChangeEvent(Action<EntityTypeChanged> onChangeEvent)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand Down
85 changes: 85 additions & 0 deletions src/net/KEFCore/Storage/EntityTypeChanged.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.EntityFrameworkCore.KNet.Infrastructure;

namespace MASES.EntityFrameworkCore.KNet.Storage;
/// <summary>
/// The event data informing about changes on an <see cref="IEntityType"/> using the <see cref="KafkaDbContext.OnChangeEvent"/> event handler
/// </summary>
public readonly struct EntityTypeChanged
{
/// <summary>
/// The change occurred
/// </summary>
[Flags]
public enum ChangeKindType
{
/// <summary>
/// The <see cref="Key"/> was added
/// </summary>
Added = 1,
/// <summary>
/// The <see cref="Key"/> was updated
/// </summary>
Updated = 2,
/// <summary>
/// The <see cref="Key"/> was removed
/// </summary>
Removed = 4,
/// <summary>
/// The <see cref="Key"/> was added or updated
/// </summary>
Upserted = Added | Updated,
}

internal EntityTypeChanged(IEntityType entityType, ChangeKindType changeKind, object key)
{
EntityType = entityType;
ChangeKind = changeKind;
Key = key;
}
/// <summary>
/// The <see cref="IEntityType"/> with changes
/// </summary>
public IEntityType EntityType { get; }
/// <summary>
/// The <see cref="ChangeKindType.Removed"/> if <see cref="Key"/> was deleted, otherwise <see cref="Key"/> was added or updated
/// </summary>
public ChangeKindType ChangeKind { get; }
/// <summary>
/// The key removed if <see cref="ChangeKind"/> is <see cref="ChangeKindType.Removed"/>, otherwise it was added or updated
/// </summary>
public object? Key { get; }
/// <summary>
/// Helper to understand if the <see cref="Key"/> was added
/// </summary>
public bool KeyAdded => ChangeKind.HasFlag(ChangeKindType.Added);
/// <summary>
/// Helper to understand if the <see cref="Key"/> was updated
/// </summary>
public bool KeyUpdated => ChangeKind.HasFlag(ChangeKindType.Updated);
/// <summary>
/// Helper to understand if the <see cref="Key"/> was removed
/// </summary>
public bool KeyRemoved => ChangeKind.HasFlag(ChangeKindType.Removed);
/// <summary>
/// Helper to understand if the <see cref="Key"/> was added or updated
/// </summary>
public bool KeyUpserted => ChangeKind.HasFlag(ChangeKindType.Added) | ChangeKind.HasFlag(ChangeKindType.Updated);
}
14 changes: 11 additions & 3 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class EntityTypeProducer<TKey, TValueContainer, TKeySerializer, TValueSer
private readonly IKafkaStreamsBaseRetriever? _streamData;
private readonly IKNetSerDes<TKey>? _keySerdes;
private readonly IKNetSerDes<TValueContainer>? _valueSerdes;
private readonly Action<IEntityType, bool, object>? _onChangeEvent;
private readonly Action<EntityTypeChanged>? _onChangeEvent;

#region KNetCompactedReplicatorEnumerable
class KNetCompactedReplicatorEnumerable : IEnumerable<ValueBuffer>
Expand Down Expand Up @@ -298,11 +298,19 @@ public IEnumerable<ValueBuffer> ValueBuffers

private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
_onChangeEvent?.Invoke(_entityType, false, arg2.Key);
try
{
_onChangeEvent?.Invoke(new EntityTypeChanged(_entityType, EntityTypeChanged.ChangeKindType.Upserted, arg2.Key));
}
catch { }
}

private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
_onChangeEvent?.Invoke(_entityType, true, arg2.Key);
try
{
_onChangeEvent?.Invoke(new EntityTypeChanged(_entityType, EntityTypeChanged.ChangeKindType.Removed, arg2.Key));
}
catch { }
}
}
16 changes: 12 additions & 4 deletions src/net/templates/templates/kefcoreAppWithEvents/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using MASES.EntityFrameworkCore.KNet.Storage;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -9,14 +9,22 @@ namespace MASES.EntityFrameworkCore.KNet.Templates
{
partial class Program
{
static void OnEvent(IEntityType entity, bool state, object key)
static BloggingContext context = null;
static void OnEvent(EntityTypeChanged change)
{
Console.WriteLine($"Entity {entity.Name} has {(state ? "removed" : "added/updated")} the key {key}");
object value = null;
try
{
value = context?.Find(change.EntityType.ClrType, change.Key);
}
catch (ObjectDisposedException) { } // the context can be disposed if the program exited
catch (InvalidOperationException) { } // there are multiple concurrent operations on context https://learn.microsoft.com/en-us/ef/core/dbcontext-configuration/#avoiding-dbcontext-threading-issues

Console.WriteLine($"Entity {change.EntityType.Name} has {(change.KeyRemoved ? "removed" : "added/updated")} the key {change.Key} with value {value}");
}

static void Main(string[] args)
{
BloggingContext context = null;
try
{
context = new BloggingContext()
Expand Down
1 change: 1 addition & 0 deletions test/Common/ProgramConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ public class ProgramConfig
public int NumberOfElements { get; set; } = 1000;
public int NumberOfExecutions { get; set; } = 1;
public int NumberOfExtraElements { get; set; } = 100;
public bool WithEvents { get; set; } = false;
}
}
3 changes: 3 additions & 0 deletions test/KEFCore.Test/KEFCore.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
<None Update="KafkaStreamsTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="Test.KNetReplicatorWithEvents.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="TestAvro.KNetReplicatorModelBuilder.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
Loading

0 comments on commit 7478325

Please sign in to comment.