Skip to content

Commit

Permalink
Update Avro, Json, MessagePack and ProtoBuf serializers (#265)
Browse files Browse the repository at this point in the history
* Changed SerDes structure with new types on Headers

* Added support of Avro serialization for both Binary and Json encoding

* Some refinements on documentation and serializers

* Removed management of simple types and forced constraints where applicable
  • Loading branch information
masesdevelopers authored Oct 17, 2023
1 parent d193952 commit bfe14a8
Show file tree
Hide file tree
Showing 8 changed files with 701 additions and 99 deletions.
28 changes: 28 additions & 0 deletions src/documentation/articles/usageSerDes.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,31 @@ KNetSerDes<TestType> serdes = new JsonSerDes<TestType>();
```
A single `JsonSerDes` can be used in serialization and deserialization, and produce Json serialized data.

## Specific cases

Some kind of serializers extension have specific needs will be listed below.

### Avro serializer

The Avro serializer is based on [Apache.Avro](https://www.nuget.org/packages/Apache.Avro) package. The types managed are:
- Avro types managed using the Avro library are Avro **record**s which:
- Shall have a parameterless constructor
- Shall conform to [ISpecificRecord](https://avro.apache.org/docs/1.11.1/api/csharp/html/interfaceAvro_1_1Specific_1_1ISpecificRecord.html)

**NOTE**: simple types (the one that have an Apche Kafka default serializer) are not managed and will be refused

### MessagePack serializer

The MessagePack serializer is based on [MessagePack](https://www.nuget.org/packages/MessagePack) package. The types managed are:
- MessagePack types managed using the MessagePack library shall be MessagePack types.

**NOTE**: simple types (the one that have an Apche Kafka default serializer) are not managed and will be refused

### Protobuf serializer

The Protobuf serializer is based on [Google.Protobuf](https://www.nuget.org/packages/Google.Protobuf) package. The types managed are:
- Protobuf types managed using the Protobuf library shall be messages types which:
- Shall have a parameterless constructor
- Shall conform to [`IMessage<T>`](https://cloud.google.com/dotnet/docs/reference/Google.Protobuf/latest/Google.Protobuf.IMessage-1)

**NOTE**: simple types (the one that have an Apche Kafka default serializer) are not managed and will be refused
347 changes: 327 additions & 20 deletions src/net/KNet.Serialization.Avro/AvroSerDes.cs

Large diffs are not rendered by default.

150 changes: 123 additions & 27 deletions src/net/KNet.Serialization.Json/JsonSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,147 @@
*/

using Org.Apache.Kafka.Common.Header;
using System;
using System.Text;

namespace MASES.KNet.Serialization.Json
{
/// <summary>
/// Json extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// Base class to define extensions of <see cref="KNetSerDes{T}"/> for Json, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class JsonSerDes<T> : KNetSerDes<T>
public static class JsonSerDes
{
/// <summary>
/// The extension uses <see cref="Headers"/>
/// Json extension of <see cref="KNetSerDes{T}"/> for Key, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
public override bool UseHeaders => true;
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
/// <typeparam name="T"></typeparam>
public class Key<T> : KNetSerDes<T>
{
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Key<>).ToAssemblyQualified());
readonly byte[] keyTypeName = null;
readonly IKNetSerDes<T> _defaultSerDes = default!;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// </summary>
public Key()
{
if (KNetSerialization.IsInternalManaged<T>())
{
_defaultSerDes = new KNetSerDes<T>();
keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
}
else
{
keyTypeName = Encoding.UTF8.GetBytes(typeof(T).ToAssemblyQualified());
}
}
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

#if NET462_OR_GREATER
var jsonStr = Newtonsoft.Json.JsonConvert.SerializeObject(data, Newtonsoft.Json.Formatting.None);
return Encoding.UTF8.GetBytes(jsonStr);
var jsonStr = Newtonsoft.Json.JsonConvert.SerializeObject(data, Newtonsoft.Json.Formatting.None);
return Encoding.UTF8.GetBytes(jsonStr);
#else
var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
#endif
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data);
if (data == null) return default;
#if NET462_OR_GREATER
var jsonStr = Encoding.UTF8.GetString(data);
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(jsonStr);
#else
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
#endif
}
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)

/// <summary>
/// Json extension of <see cref="KNetSerDes{T}"/> for Value, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class Value<T> : KNetSerDes<T>
{
if (data == null) return default;
readonly byte[] valueSerDesName = Encoding.UTF8.GetBytes(typeof(Value<>).ToAssemblyQualified());
readonly byte[] valueTypeName = null!;
readonly IKNetSerDes<T> _defaultSerDes = default!;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// </summary>
public Value()
{
if (KNetSerialization.IsInternalManaged<T>())
{
_defaultSerDes = new KNetSerDes<T>();
valueTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
}
else
{
valueTypeName = Encoding.UTF8.GetBytes(typeof(T).ToAssemblyQualified());
}
}
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueTypeName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

#if NET462_OR_GREATER
var jsonStr = Newtonsoft.Json.JsonConvert.SerializeObject(data, Newtonsoft.Json.Formatting.None);
return Encoding.UTF8.GetBytes(jsonStr);
#else
var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
#endif
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data);

if (data == null) return default;
#if NET462_OR_GREATER
var jsonStr = Encoding.UTF8.GetString(data);
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(jsonStr);
var jsonStr = Encoding.UTF8.GetString(data);
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(jsonStr);
#else
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
#endif
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<PackageReference Include="System.Memory" Version="4.5.5" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="MessagePack" Version="2.5.124" />
<PackageReference Include="MessagePack" Version="2.5.129" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
126 changes: 102 additions & 24 deletions src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,121 @@
*/

using Org.Apache.Kafka.Common.Header;
using MessagePack;
using global::MessagePack;
using System.IO;
using System.Text;
using System;

namespace MASES.KNet.Serialization.MessagePack
{
/// <summary>
/// MessagePack extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// Base class to define extensions of <see cref="KNetSerDes{T}"/> for MessagePack, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class MessagePackSerDes<T> : KNetSerDes<T>
public static class MessagePackSerDes
{
/// <summary>
/// The extension uses <see cref="Headers"/>
/// MessagePack extension of <see cref="KNetSerDes{T}"/> for Key, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
public override bool UseHeaders => true;
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
/// <typeparam name="T"></typeparam>
public class Key<T> : KNetSerDes<T>
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
return MessagePackSerializer.Serialize(data);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Key<>).ToAssemblyQualified());
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).ToAssemblyQualified());
/// <summary>
/// Get or set the <see cref="global::MessagePack.MessagePackSerializerOptions"/> to be used, default is <see langword="null"/>
/// </summary>
public MessagePackSerializerOptions MessagePackSerializerOptions { get; set; } = null;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// </summary>
public Key()
{
if (KNetSerialization.IsInternalManaged<T>())
{
throw new InvalidOperationException($"{typeof(T).Name} is a type managed from basic serializer, do not use {typeof(Key<T>).FullName}");
}
}
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

return MessagePackSerializer.Serialize(data, MessagePackSerializerOptions);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default;
using (MemoryStream stream = new MemoryStream(data))
{
return MessagePackSerializer.Deserialize<T>(stream, MessagePackSerializerOptions);
}
}
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)

/// <summary>
/// MessagePack extension of <see cref="KNetSerDes{T}"/> for Value, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class Value<T> : KNetSerDes<T>
{
if (data == null) return default;
using (MemoryStream stream = new MemoryStream(data))
readonly byte[] valueSerDesName = Encoding.UTF8.GetBytes(typeof(Value<>).ToAssemblyQualified());
readonly byte[] valueTypeName = Encoding.UTF8.GetBytes(typeof(T).ToAssemblyQualified());
/// <summary>
/// Get or set the <see cref="global::MessagePack.MessagePackSerializerOptions"/> to be used, default is <see langword="null"/>
/// </summary>
public MessagePackSerializerOptions MessagePackSerializerOptions { get; set; } = null;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
/// Default initializer
/// </summary>
public Value()
{
if (KNetSerialization.IsInternalManaged<T>())
{
throw new InvalidOperationException($"{typeof(T).Name} is a type managed from basic serializer, do not use {typeof(Key<T>).FullName}");
}
}
/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueTypeName);

return MessagePackSerializer.Serialize(data, MessagePackSerializerOptions);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
return MessagePackSerializer.Deserialize<T>(stream);
if (data == null) return default;
using (MemoryStream stream = new MemoryStream(data))
{
return MessagePackSerializer.Deserialize<T>(stream, MessagePackSerializerOptions);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<None Include="..\..\documentation\articles\usageSerDes.md" Pack="true" PackagePath="\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.24.3" />
<PackageReference Include="Google.Protobuf" Version="3.24.4" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Loading

0 comments on commit bfe14a8

Please sign in to comment.