Skip to content

Commit

Permalink
Updates on Protobuf serializer adding properties to simplify retrievi…
Browse files Browse the repository at this point in the history
…ng of default types (#144)

* Updates on Protobuf serializer adding properties to simplify retrieving of default types

* General review of test program
  • Loading branch information
masesdevelopers authored Oct 25, 2023
1 parent a2c47bb commit 1127e6c
Show file tree
Hide file tree
Showing 24 changed files with 231 additions and 679 deletions.
18 changes: 9 additions & 9 deletions src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ The code is based on three elements shall be available to [Entity Framework Core
### Default types

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) comes with some default values:
- **ValueContainer** class: KEFCore uses `DefaultValueContainer<T>` which stores the CLR type of Entity, the properties ordered by their index with associated CLT type, name and JSON serializaed value; the class is marked for JSON serialization and it is used from the **ValueContainer SerDes**;
- **Key SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.Key.Json<T>`, the type automatically manages simple or complex Primary Key
- **ValueContainer SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.ValueContainer.Json<>`
- **ValueContainer** class: KEFCore uses `DefaultValueContainer<T>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainer`) which stores the CLR type of Entity, the properties ordered by their index with associated CLT type, name and JSON serializaed value; the class is marked for JSON serialization and it is used from the **ValueContainer SerDes**;
- **Key SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.Key.Json<T>` (i.e. `DefaultKEFCoreSerDes.DefaultKeySerialization`), the type automatically manages simple or complex Primary Key
- **ValueContainer SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.ValueContainer.Json<>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainerSerialization`)

### User override

Expand Down Expand Up @@ -338,9 +338,9 @@ The extension converted this schema into code to speedup the exection of seriali
### How to use Avro

`KafkaDbContext` contains three properties can be used to override the default types:
- **KeySerializationType**: set this value to `AvroKEFCoreSerDes.Key.Binary<>` or `AvroKEFCoreSerDes.Key.Json<>`, both types automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `AvroKEFCoreSerDes.ValueContainer.Binary<>` or `AvroKEFCoreSerDes.ValueContainer.Json<>`
- **ValueContainerType**: set this value to `AvroValueContainer<>`
- **KeySerializationType**: set this value to `AvroKEFCoreSerDes.Key.Binary<>` or `AvroKEFCoreSerDes.Key.Json<>` or use `AvroKEFCoreSerDes.DefaultKeySerialization` (defaults to `AvroKEFCoreSerDes.Key.Binary<>`), both types automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `AvroKEFCoreSerDes.ValueContainer.Binary<>` or `AvroKEFCoreSerDes.ValueContainer.Json<>` or use `AvroKEFCoreSerDes.DefaultValueContainerSerialization` (defaults to `AvroKEFCoreSerDes.ValueContainer.Binary<>`)
- **ValueContainerType**: set this value to `AvroValueContainer<>` or use `AvroKEFCoreSerDes.DefaultValueContainer`

An example is:

Expand Down Expand Up @@ -494,9 +494,9 @@ The extension converted this schema into code to speedup the exection of seriali
### How to use Protobuf

`KafkaDbContext` contains three properties can be used to override the default types:
- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes.Key<>`, the type automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer<>`
- **ValueContainerType**: set this value to `ProtobufValueContainer<>`
- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes..Key.Binary<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer.Binary<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainerSerialization`
- **ValueContainerType**: set this value to `ProtobufValueContainer<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainer`

An example is:

Expand Down
12 changes: 12 additions & 0 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro;
/// </summary>
public static class AvroKEFCoreSerDes
{
/// <summary>
/// Returns the default serializer <see cref="Type"/> for keys
/// </summary>
public static readonly Type DefaultKeySerialization = typeof(Key.Binary<>);
/// <summary>
/// Returns the default serializer <see cref="Type"/> for value containers
/// </summary>
public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.Binary<>);
/// <summary>
/// Returns the default <see cref="Type"/> for value containers
/// </summary>
public static readonly Type DefaultValueContainer = typeof(AvroValueContainer<>);
/// <summary>
/// Base class to define key extensions of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
Expand Down
206 changes: 115 additions & 91 deletions src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,130 +32,154 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Protobuf;
public static class ProtobufKEFCoreSerDes
{
/// <summary>
/// Avro Key Binary encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// Returns the default serializer <see cref="Type"/> for keys
/// </summary>
/// <typeparam name="T"></typeparam>
public class Key<T> : KNetSerDes<T>
public static readonly Type DefaultKeySerialization = typeof(Key.Binary<>);
/// <summary>
/// Returns the default serializer <see cref="Type"/> for value containers
/// </summary>
public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.Binary<>);
/// <summary>
/// Returns the default <see cref="Type"/> for value containers
/// </summary>
public static readonly Type DefaultValueContainer = typeof(ProtobufValueContainer<>);
/// <summary>
/// Base class to define key extensions of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
public static class Key
{
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Key<>).ToAssemblyQualified());
readonly IKNetSerDes<T> _defaultSerDes = default!;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// Protobuf Key Binary encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
public Key()
/// <typeparam name="T"></typeparam>
public class Binary<T> : KNetSerDes<T>
{
if (KNetSerialization.IsInternalManaged<T>())
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified());
readonly IKNetSerDes<T> _defaultSerDes = default!;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// </summary>
public Binary()
{
_defaultSerDes = new KNetSerDes<T>();
if (KNetSerialization.IsInternalManaged<T>())
{
_defaultSerDes = new KNetSerDes<T>();
}
else if (!typeof(T).IsArray)
{
throw new InvalidOperationException($"{typeof(Binary<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer");
}
}
else if (!typeof(T).IsArray)

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
throw new InvalidOperationException($"{typeof(Key<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer");
return SerializeWithHeaders(topic, null!, data);
}
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
if (data is object[] dataArray)
{
keyContainer = new KeyContainer(dataArray);
}

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
if (data is object[] dataArray)
using MemoryStream stream = new();
keyContainer.WriteTo(stream);
return stream.ToArray();
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
keyContainer = new KeyContainer(dataArray);
return DeserializeWithHeaders(topic, null!, data);
}

using MemoryStream stream = new();
keyContainer.WriteTo(stream);
return stream.ToArray();
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data);
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data);

if (data == null) return default!;
if (data == null) return default!;

KeyContainer container = KeyContainer.Parser.ParseFrom(data);
KeyContainer container = KeyContainer.Parser.ParseFrom(data);

return (T)container.GetContent();
return (T)container.GetContent();
}
}
}

/// <summary>
/// Avro ValueContainer Binary encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// Base class to define ValueContainer extensions of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class ValueContainer<T> : KNetSerDes<T> where T : class, IMessage<T>
public static class ValueContainer
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(ValueContainer<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// Protobuf ValueContainer Binary encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
public ValueContainer()
/// <typeparam name="T"></typeparam>
public class Binary<T> : KNetSerDes<T> where T : class, IMessage<T>
{
var tt = typeof(T);
if (tt.IsGenericType)
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// </summary>
public Binary()
{
var keyT = tt.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); }
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
var tt = typeof(T);
if (tt.IsGenericType)
{
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
var keyT = tt.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); }
var t = tt.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type");
}
throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type");
}

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);

using (MemoryStream stream = new())
using (MemoryStream stream = new())
{
data.WriteTo(stream);
return stream.ToArray();
}
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
data.WriteTo(stream);
return stream.ToArray();
return DeserializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default!;
var container = Storage.ValueContainer.Parser.ParseFrom(data);
return (Activator.CreateInstance(typeof(T), container) as T)!;
}
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default!;
var container = ValueContainer.Parser.ParseFrom(data);
return (Activator.CreateInstance(typeof(T), container) as T)!;
}
}
}
12 changes: 12 additions & 0 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Json;
/// </summary>
public static class DefaultKEFCoreSerDes
{
/// <summary>
/// Returns the default serializer <see cref="Type"/> for keys
/// </summary>
public static readonly Type DefaultKeySerialization = typeof(Key.Json<>);
/// <summary>
/// Returns the default serializer <see cref="Type"/> for value containers
/// </summary>
public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.Json<>);
/// <summary>
/// Returns the default <see cref="Type"/> for value containers
/// </summary>
public static readonly Type DefaultValueContainer = typeof(DefaultValueContainer<>);
/// <summary>
/// Base class to define key extensions of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
/// </summary>
public class KafkaOptionsExtension : IDbContextOptionsExtension
{
private Type _keySerializationType = typeof(DefaultKEFCoreSerDes.Key.Json<>);
private Type _valueSerializationType = typeof(DefaultKEFCoreSerDes.ValueContainer.Json<>);
private Type _valueContainerType = typeof(DefaultValueContainer<>);
private Type _keySerializationType = DefaultKEFCoreSerDes.DefaultKeySerialization;
private Type _valueSerializationType = DefaultKEFCoreSerDes.DefaultValueContainerSerialization;
private Type _valueContainerType = DefaultKEFCoreSerDes.DefaultValueContainer;
private bool _useNameMatching = true;
private string? _databaseName;
private string? _applicationId;
Expand Down
Loading

0 comments on commit 1127e6c

Please sign in to comment.