Skip to content

Commit

Permalink
Added new extension in EntityExtractor, update data stored in Headers…
Browse files Browse the repository at this point in the history
…, reviewed documentation and changed test programs
  • Loading branch information
masesdevelopers committed Oct 17, 2023
1 parent 62442a5 commit 05b28d9
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 73 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
## Summary

* [Getting started](src/documentation/articles/gettingstarted.md)
* [KEFCore usage](src/documentation/articles/usage.md)
* [KEFCore serialization](src/documentation/articles/serialization.md)
* [KEFCore external application](src/documentation/articles/externalapplication.md)
* [Usage](src/documentation/articles/usage.md)
* [Use cases](src/documentation/articles/usecases.md)
* [Serialization](src/documentation/articles/serialization.md)
* [External application](src/documentation/articles/externalapplication.md)
* [Roadmap](src/documentation/articles/roadmap.md)
* [Current state](src/documentation/articles/currentstate.md)

Expand Down
12 changes: 6 additions & 6 deletions src/documentation/articles/externalapplication.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# KEFCore: external application

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities used within the model in something viable from the backend.
Continuing from the concepts introduced in [serialization](serialization.md), an external application can use the data stored in the topics in a way it decide: [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) gives some helpers to get back the CLR Entity objects stored in the topics.
Continuing from the concepts introduced in [serialization](serialization.md), an external application can use the data stored in the topics in a way it decides: [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) gives some helpers to get back the CLR Entity objects stored in the topics.

> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.
## Basic concepts

An external application may want to be informed about data changes in the topics and want to analyze the Entity was previously managed from the EFCore application.
Within the core packages there is the `EntityExtractor` class which contains, till now, a method accepting a raw `ConsumerRecord<byte[], byte[]>` from Apache Kafka.
The method reads the info stored in it and returns the Entity object with the filled properties.
Within the core packages there is the `EntityExtractor` class which contains, till now, few methods and one accepts a raw `ConsumerRecord<byte[], byte[]>` from Apache Kafka.
The method reads the info stored in the `ConsumerRecord<byte[], byte[]>` and returns the Entity object with the filled properties.

It is possible to build a new application which subscribe to a topic created from the EFCore application.
The following is a possible snippet of the logic can be applied:
Expand All @@ -33,11 +33,11 @@ A full working example can be found under test folder of the [repository](https:

### Mandatory information

The method `EntityExtractor.FromRecord` use the reflection to get back the types referring to serializer and model whih were stored in the topics.
The method `EntityExtractor.FromTopic`, and then `EntityExtractor.FromRecord`, use the reflection to get back the types referring to serializer and types of the model which were stored in the topics.
To work properly it needs, to be loaded in memory, at least:
- The assembly containing the serializer: if the serializer used was the default, this inforation is available
- The assembly containing the serializer: if the serializer are the default this information is intrisecally available
- The model types (i.e. the types used to build the `DbContext` or `KafkaDbContext`)

## Possible usages

TDB
For possible usages of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/), and this feature, see [use cases](usecases.md)
63 changes: 62 additions & 1 deletion src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,16 @@ A custom **Key SerDes** class shall follow the following rules:
- must implements the `IKNetSerDes<T>` interface or extend `KNetSerDes<T>`
- must be a generic type
- must have a parameterless constructor
- can store serialization information using Headers of Apache Kafka record (this information will be used from `EntityExtractor`)

An example snippet is the follow based on JSON serializer:

```C#
public class CustomSerDes<T> : KNetSerDes<T>
public class CustomKeySerDes<T> : KNetSerDes<T>
{
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] customSerDesName = Encoding.UTF8.GetBytes(typeof(CustomKeySerDes<>).FullName!);

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
Expand All @@ -150,6 +154,63 @@ public class CustomSerDes<T> : KNetSerDes<T>
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, customSerDesName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default;
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
}
}
```

```C#
public class CustomValueContainerSerDes<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(CustomValueContainerSerDes<>).FullName!);
readonly byte[] valueContainerName = null!;
/// <summary>
/// Default initializer
/// </summary>
public CustomValueContainerSerDes()
{
var tt = typeof(T);
if (tt.IsGenericType)
{
var keyT = tt.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); }
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
}
throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type");
}

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
Expand Down
2 changes: 2 additions & 0 deletions src/documentation/articles/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
href: gettingstarted.md
- name: Usage
href: usage.md
- name: Use cases
href: usecases.md
- name: Serialization
href: serialization.md
- name: External application
Expand Down
4 changes: 4 additions & 0 deletions src/documentation/articles/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ public class Post
public Blog Blog { get; set; }
}
```

## Possible usages

For possible usages of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/), and this feature, see [use cases](usecases.md)
6 changes: 6 additions & 0 deletions src/documentation/articles/usecases.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# KEFCore: use cases

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) can be used in some operative conditions.
Here a possible, non exausthive list, of use cases.

TBD
7 changes: 4 additions & 3 deletions src/documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
## Summary

* [Getting started](articles/gettingstarted.md)
* [KEFCore usage](articles/usage.md)
* [KEFCore serialization](articles/serialization.md)
* [KEFCore external application](articles/externalapplication.md)
* [Usage](articles/usage.md)
* [Use cases](articles/usecases.md)
* [Serialization](articles/serialization.md)
* [External application](articles/externalapplication.md)
* [Roadmap](articles/roadmap.md)
* [Current state](articles/currentstate.md)

Expand Down
22 changes: 10 additions & 12 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public static class Key
/// <typeparam name="T"></typeparam>
public class Binary<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.ASCII.GetBytes(typeof(Binary<>).FullName!);
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified());
readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA);
readonly IKNetSerDes<T> _defaultSerDes = default!;
Expand Down Expand Up @@ -72,6 +73,7 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
Expand Down Expand Up @@ -107,7 +109,8 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d
/// <typeparam name="T"></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.ASCII.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA);
readonly IKNetSerDes<T> _defaultSerDes = default!;
Expand Down Expand Up @@ -136,6 +139,7 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
Expand Down Expand Up @@ -176,8 +180,7 @@ public static class ValueContainer
/// <typeparam name="T"></typeparam>
public class Binary<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.ASCII.GetBytes(typeof(Binary<>).FullName!);
readonly byte[] keyTypeName = null!;
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);
Expand All @@ -196,8 +199,7 @@ public Binary()
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
keyTypeName = Encoding.UTF8.GetBytes(keyT[0].FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -214,7 +216,6 @@ public override byte[] Serialize(string topic, T data)
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

using MemoryStream memStream = new();
Expand Down Expand Up @@ -244,8 +245,7 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d
/// <typeparam name="T"></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keyTypeName = null!;
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);
Expand All @@ -264,8 +264,7 @@ public Json()
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
keyTypeName = Encoding.UTF8.GetBytes(keyT[0].FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -282,7 +281,6 @@ public override byte[] Serialize(string topic, T data)
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

using MemoryStream memStream = new();
Expand Down
11 changes: 5 additions & 6 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static class Key
/// <typeparam name="T">The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like <see cref="DefaultValueContainer{TKey}"/></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.ASCII.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly IKNetSerDes<T> _defaultSerDes = default!;
/// <inheritdoc/>
public override bool UseHeaders => true;
Expand All @@ -67,6 +68,7 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
Expand Down Expand Up @@ -101,8 +103,7 @@ public static class ValueContainer
/// <typeparam name="T">The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like <see cref="DefaultValueContainer{TKey}"/></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.ASCII.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keyTypeName = null!;
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
/// <inheritdoc/>
public override bool UseHeaders => true;
Expand All @@ -119,8 +120,7 @@ public Json()
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
keyTypeName = Encoding.UTF8.GetBytes(keyT[0].FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -137,7 +137,6 @@ public override byte[] Serialize(string topic, T data)
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
Expand Down
Loading

0 comments on commit 05b28d9

Please sign in to comment.