Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new extension in EntityExtractor, update data stored in Headers, reviewed documentation and changed test programs #107

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
## Summary

* [Getting started](src/documentation/articles/gettingstarted.md)
* [KEFCore usage](src/documentation/articles/usage.md)
* [KEFCore serialization](src/documentation/articles/serialization.md)
* [KEFCore external application](src/documentation/articles/externalapplication.md)
* [Usage](src/documentation/articles/usage.md)
* [Use cases](src/documentation/articles/usecases.md)
* [Serialization](src/documentation/articles/serialization.md)
* [External application](src/documentation/articles/externalapplication.md)
* [Roadmap](src/documentation/articles/roadmap.md)
* [Current state](src/documentation/articles/currentstate.md)

Expand Down
12 changes: 6 additions & 6 deletions src/documentation/articles/externalapplication.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# KEFCore: external application

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) shall convert the entities used within the model in something viable from the backend.
Continuing from the concepts introduced in [serialization](serialization.md), an external application can use the data stored in the topics in a way it decide: [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) gives some helpers to get back the CLR Entity objects stored in the topics.
Continuing from the concepts introduced in [serialization](serialization.md), an external application can use the data stored in the topics in a way it decides: [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) gives some helpers to get back the CLR Entity objects stored in the topics.

> IMPORTANT NOTE: till the first major version, all releases shall be considered not stable: this means the API public, or internal, can be change without notice.

## Basic concepts

An external application may want to be informed about data changes in the topics and want to analyze the Entity was previously managed from the EFCore application.
Within the core packages there is the `EntityExtractor` class which contains, till now, a method accepting a raw `ConsumerRecord<byte[], byte[]>` from Apache Kafka.
The method reads the info stored in it and returns the Entity object with the filled properties.
Within the core packages there is the `EntityExtractor` class which contains, till now, few methods and one accepts a raw `ConsumerRecord<byte[], byte[]>` from Apache Kafka.
The method reads the info stored in the `ConsumerRecord<byte[], byte[]>` and returns the Entity object with the filled properties.

It is possible to build a new application which subscribe to a topic created from the EFCore application.
The following is a possible snippet of the logic can be applied:
Expand All @@ -33,11 +33,11 @@ A full working example can be found under test folder of the [repository](https:

### Mandatory information

The method `EntityExtractor.FromRecord` use the reflection to get back the types referring to serializer and model whih were stored in the topics.
The method `EntityExtractor.FromTopic`, and then `EntityExtractor.FromRecord`, use the reflection to get back the types referring to serializer and types of the model which were stored in the topics.
To work properly it needs, to be loaded in memory, at least:
- The assembly containing the serializer: if the serializer used was the default, this inforation is available
- The assembly containing the serializer: if the serializer are the default this information is intrisecally available
- The model types (i.e. the types used to build the `DbContext` or `KafkaDbContext`)

## Possible usages

TDB
For possible usages of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/), and this feature, see [use cases](usecases.md)
63 changes: 62 additions & 1 deletion src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,16 @@ A custom **Key SerDes** class shall follow the following rules:
- must implements the `IKNetSerDes<T>` interface or extend `KNetSerDes<T>`
- must be a generic type
- must have a parameterless constructor
- can store serialization information using Headers of Apache Kafka record (this information will be used from `EntityExtractor`)

An example snippet is the follow based on JSON serializer:

```C#
public class CustomSerDes<T> : KNetSerDes<T>
public class CustomKeySerDes<T> : KNetSerDes<T>
{
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] customSerDesName = Encoding.UTF8.GetBytes(typeof(CustomKeySerDes<>).FullName!);

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
Expand All @@ -150,6 +154,63 @@ public class CustomSerDes<T> : KNetSerDes<T>
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, customSerDesName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default;
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
}
}
```

```C#
public class CustomValueContainerSerDes<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(CustomValueContainerSerDes<>).FullName!);
readonly byte[] valueContainerName = null!;
/// <summary>
/// Default initializer
/// </summary>
public CustomValueContainerSerDes()
{
var tt = typeof(T);
if (tt.IsGenericType)
{
var keyT = tt.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); }
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
}
throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type");
}

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
Expand Down
2 changes: 2 additions & 0 deletions src/documentation/articles/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
href: gettingstarted.md
- name: Usage
href: usage.md
- name: Use cases
href: usecases.md
- name: Serialization
href: serialization.md
- name: External application
Expand Down
4 changes: 4 additions & 0 deletions src/documentation/articles/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ public class Post
public Blog Blog { get; set; }
}
```

## Possible usages

For possible usages of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/), and this feature, see [use cases](usecases.md)
6 changes: 6 additions & 0 deletions src/documentation/articles/usecases.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# KEFCore: use cases

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) can be used in some operative conditions.
Here a possible, non exausthive list, of use cases.

TBD
7 changes: 4 additions & 3 deletions src/documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
## Summary

* [Getting started](articles/gettingstarted.md)
* [KEFCore usage](articles/usage.md)
* [KEFCore serialization](articles/serialization.md)
* [KEFCore external application](articles/externalapplication.md)
* [Usage](articles/usage.md)
* [Use cases](articles/usecases.md)
* [Serialization](articles/serialization.md)
* [External application](articles/externalapplication.md)
* [Roadmap](articles/roadmap.md)
* [Current state](articles/currentstate.md)

Expand Down
22 changes: 10 additions & 12 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
/// <typeparam name="T"></typeparam>
public class Binary<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.ASCII.GetBytes(typeof(Binary<>).FullName!);
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified());
readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA);
readonly IKNetSerDes<T> _defaultSerDes = default!;
Expand Down Expand Up @@ -72,6 +73,7 @@
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
Expand All @@ -79,7 +81,7 @@
using MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
var container = new AvroKeyContainer();
container.PrimaryKey = new List<object>(data as object[]);

Check warning on line 84 in src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference argument for parameter 'collection' in 'List<object>.List(IEnumerable<object> collection)'.
SpecificWriter.Write(container, encoder);
return memStream.ToArray();
}
Expand Down Expand Up @@ -107,7 +109,8 @@
/// <typeparam name="T"></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.ASCII.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA);
readonly IKNetSerDes<T> _defaultSerDes = default!;
Expand Down Expand Up @@ -136,6 +139,7 @@
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
Expand Down Expand Up @@ -176,8 +180,7 @@
/// <typeparam name="T"></typeparam>
public class Binary<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.ASCII.GetBytes(typeof(Binary<>).FullName!);
readonly byte[] keyTypeName = null!;
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);
Expand All @@ -196,8 +199,7 @@
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
keyTypeName = Encoding.UTF8.GetBytes(keyT[0].FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -214,7 +216,6 @@
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

using MemoryStream memStream = new();
Expand Down Expand Up @@ -244,8 +245,7 @@
/// <typeparam name="T"></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keyTypeName = null!;
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA);
readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);
Expand All @@ -264,8 +264,7 @@
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
keyTypeName = Encoding.UTF8.GetBytes(keyT[0].FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -282,7 +281,6 @@
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

using MemoryStream memStream = new();
Expand Down
11 changes: 5 additions & 6 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static class Key
/// <typeparam name="T">The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like <see cref="DefaultValueContainer{TKey}"/></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.ASCII.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly IKNetSerDes<T> _defaultSerDes = default!;
/// <inheritdoc/>
public override bool UseHeaders => true;
Expand All @@ -67,6 +68,7 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
Expand Down Expand Up @@ -101,8 +103,7 @@ public static class ValueContainer
/// <typeparam name="T">The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like <see cref="DefaultValueContainer{TKey}"/></typeparam>
public class Json<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.ASCII.GetBytes(typeof(Json<>).FullName!);
readonly byte[] keyTypeName = null!;
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
/// <inheritdoc/>
public override bool UseHeaders => true;
Expand All @@ -119,8 +120,7 @@ public Json()
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
keyTypeName = Encoding.UTF8.GetBytes(keyT[0].FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.FullName!);
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -137,7 +137,6 @@ public override byte[] Serialize(string topic, T data)
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
Expand Down
Loading