Skip to content

Commit

Permalink
Upgrade to KNet 2.2.0, version and refinements
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers committed Oct 19, 2023
1 parent 1582447 commit 7fb11b5
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 84 deletions.
2 changes: 1 addition & 1 deletion src/net/Common/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Owners>MASES s.r.l.</Owners>
<Authors>MASES s.r.l.</Authors>
<Company>MASES s.r.l.</Company>
<Version>0.9.3.0</Version>
<Version>0.10.0.0</Version>
<TargetFrameworks>net6.0</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
Expand Down
29 changes: 3 additions & 26 deletions src/net/KEFCore.SerDes.Avro.Compiler/AvroSerializationHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,16 @@

#nullable enable

using Avro;
using MASES.KNet.Serialization.Avro;

namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler;

public static class AvroSerializationHelper
{
public static void BuildSchemaClasses( string outputFolder, params string[] schemas)
{
var codegen = new CodeGen();
foreach (var schema in schemas)
{
codegen.AddSchema(schema);
}
codegen.GenerateCode();
codegen.WriteTypes(outputFolder, true);
}

public static void BuildSchemaClassesFromFiles( string outputFolder, params string[] schemaFiles)
{
var codegen = new CodeGen();
foreach (var schemaFile in schemaFiles)
{
var schema = File.ReadAllText(schemaFile);
codegen.AddSchema(schema);
}
codegen.GenerateCode();
codegen.WriteTypes(outputFolder, true);
}

public static void BuildDefaultSchema(string outputFolder)
{
BuildSchemaClassesFromFiles(outputFolder, "AvroValueContainer.avsc");
BuildSchemaClassesFromFiles(outputFolder, "AvroKeyContainer.avsc");
AvroSerDes.CompilerSupport.BuildSchemaClassesFromFiles(outputFolder, "AvroValueContainer.avsc");
AvroSerDes.CompilerSupport.BuildSchemaClassesFromFiles(outputFolder, "AvroKeyContainer.avsc");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.11.3" />
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
16 changes: 8 additions & 8 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -139,8 +139,8 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -215,8 +215,8 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);

using MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
Expand Down Expand Up @@ -280,8 +280,8 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);

using MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
Expand Down
8 changes: 4 additions & 4 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
headers?.Add(KEFCoreSerDesNames.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -136,8 +136,8 @@ public override byte[] Serialize(string topic, T data)
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KEFCoreSerDesNames.ValueContainerIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
Expand Down
2 changes: 1 addition & 1 deletion src/net/KEFCore.SerDes/KEFCore.SerDes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.1.3">
<PackageReference Include="MASES.KNet" Version="2.2.0">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
Expand Down
88 changes: 45 additions & 43 deletions src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,30 @@
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Consumer;
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Common.Serialization;
using System.Text;

namespace MASES.EntityFrameworkCore.KNet.Serialization;
/// <summary>
/// Stores some fixed names expected from serialization system
/// This is an helper class to extract data information from Kafka Records stored in topics
/// </summary>
public static class KEFCoreSerDesNames
public class EntityExtractor
{
/// <summary>
/// Identity the serializer for the key
/// </summary>
public const string KeySerializerIdentifier = "key-serializer-type";
/// <summary>
/// Identity the serializer for the ValueContainer
/// </summary>
public const string ValueContainerSerializerIdentifier = "value-container-serializer-type";
/// <summary>
/// Identity the type of the key used
/// </summary>
public const string KeyTypeIdentifier = "key-type";
/// <summary>
/// Identity the ValueContainer type used
/// </summary>
public const string ValueContainerIdentifier = "value-container-type";
/// <summary>
/// Returns the typename with the assembly qualification to help reload better the types
/// Extract information for Entity using <paramref name="consumerConfig"/> configurtion within a <paramref name="topicName"/> and send them to <paramref name="cb"/>
/// </summary>
/// <param name="type">The <see cref="Type"/> to be converted</param>
/// <returns>A string with <see cref="Type.FullName"/> along with <see cref="Assembly.FullName"/></returns>
public static string ToAssemblyQualified(this Type type)
/// <param name="consumerConfig">The <see cref="ConsumerConfigBuilder"/> with configuration</param>
/// <param name="topicName">The topic containing the data</param>
/// <param name="cb">The <see cref="Action{T1, T2}"/> where data will be available</param>
/// <param name="token">The <see cref="CancellationToken"/> to use to stop execution</param>
/// <param name="onlyLatest">Start execution only for newest messages and does not execute for oldest, default is from beginning</param>
public static void FromTopic(ConsumerConfigBuilder consumerConfig, string topicName, CancellationToken token, Action<object?, Exception?> cb, bool onlyLatest = false)
{
return $"{type.FullName}, {type.Assembly.GetName().Name}";
FromTopic<object>(consumerConfig, topicName, token, cb, onlyLatest);
}
}

/// <summary>
/// This is an helper class to extract data information from Kafka Records stored in topics
/// </summary>
public class EntityExtractor
{
/// <summary>
/// Extract information for Entity from <paramref name="bootstrapServer"/> within a <paramref name="topicName"/> and send them to <paramref name="cb"/>
/// </summary>
Expand All @@ -73,7 +55,7 @@ public class EntityExtractor
/// <param name="onlyLatest">Start execution only for newest messages and does not execute for oldest, default is from beginning</param>
public static void FromTopic(string bootstrapServer, string topicName, CancellationToken token, Action<object?, Exception?> cb, bool onlyLatest = false)
{
FromTopic<object>(bootstrapServer, topicName, token, cb);
FromTopic<object>(bootstrapServer, topicName, token, cb, onlyLatest);
}
/// <summary>
/// Extract information for Entity from <paramref name="bootstrapServer"/> within a <paramref name="topicName"/> and send them to <paramref name="cb"/>
Expand All @@ -86,11 +68,26 @@ public static void FromTopic(string bootstrapServer, string topicName, Cancellat
/// <param name="onlyLatest">Start execution only for newest messages and does not execute for oldest, default is from beginning</param>
public static void FromTopic<TEntity>(string bootstrapServer, string topicName, CancellationToken token, Action<TEntity?, Exception?> cb, bool onlyLatest = false)
where TEntity : class
{
ConsumerConfigBuilder consumerBuilder = ConsumerConfigBuilder.Create().WithBootstrapServers(bootstrapServer);
FromTopic<TEntity>(consumerBuilder, topicName, token, cb, onlyLatest);
}

/// <summary>
/// Extract information for Entity using <paramref name="consumerConfig"/> configurtion within a <paramref name="topicName"/> and send them to <paramref name="cb"/>
/// </summary>
/// <typeparam name="TEntity">The Entity type if it is known</typeparam>
/// <param name="consumerConfig">The <see cref="ConsumerConfigBuilder"/> with configuration</param>
/// <param name="topicName">The topic containing the data</param>
/// <param name="cb">The <see cref="Action{T1, T2}"/> where data will be available</param>
/// <param name="token">The <see cref="CancellationToken"/> to use to stop execution</param>
/// <param name="onlyLatest">Start execution only for newest messages and does not execute for oldest, default is from beginning</param>
public static void FromTopic<TEntity>(ConsumerConfigBuilder consumerConfig, string topicName, CancellationToken token, Action<TEntity?, Exception?> cb, bool onlyLatest = false)
where TEntity : class
{
try
{
ConsumerConfigBuilder consumerBuilder = ConsumerConfigBuilder.Create()
.WithBootstrapServers(bootstrapServer)
ConsumerConfigBuilder consumerBuilder = ConsumerConfigBuilder.CreateFrom(consumerConfig)
.WithGroupId(Guid.NewGuid().ToString())
.WithAutoOffsetReset(onlyLatest ? ConsumerConfigBuilder.AutoOffsetResetTypes.LATEST : ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
.WithKeyDeserializerClass(JVMBridgeBase.ClassNameOf<ByteArrayDeserializer>())
Expand Down Expand Up @@ -144,40 +141,45 @@ public static TEntity FromRecord<TEntity>(ConsumerRecord<byte[], byte[]> record,
public static object FromRecord(ConsumerRecord<byte[], byte[]> record, bool throwUnmatch = false)
{
Type keySerializerType = null;

Check warning on line 143 in src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.
Type valueContainerSerializerType = null;
Type valueSerializerType = null;

Check warning on line 144 in src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.
Type keyType = null;

Check warning on line 145 in src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.
Type valueContainerType = null;
Type valueType = null;

Check warning on line 146 in src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.

var headers = record.Headers();
if (headers != null)
{
foreach (var header in headers.ToArray())
{
var key = header.Key();
if (key == KEFCoreSerDesNames.KeySerializerIdentifier)
if (key == KNetSerialization.KeyTypeIdentifier)
{
var strType = Encoding.UTF8.GetString(header.Value());
keySerializerType = Type.GetType(strType, true)!;
keyType = Type.GetType(strType, true)!;
}
if (key == KEFCoreSerDesNames.ValueContainerSerializerIdentifier)
if (key == KNetSerialization.KeySerializerIdentifier)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueContainerSerializerType = Type.GetType(strType, true)!;
keySerializerType = Type.GetType(strType, true)!;
}
if (key == KEFCoreSerDesNames.KeyTypeIdentifier)
if (key == KNetSerialization.ValueTypeIdentifier)
{
var strType = Encoding.UTF8.GetString(header.Value());
keyType = Type.GetType(strType, true)!;
valueType = Type.GetType(strType, true)!;
}
if (key == KEFCoreSerDesNames.ValueContainerIdentifier)
if (key == KNetSerialization.ValueSerializerIdentifier)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueContainerType = Type.GetType(strType, true)!;
valueSerializerType = Type.GetType(strType, true)!;
}
}
}

return FromRawValueData(keyType!, valueContainerType!, keySerializerType!, valueContainerSerializerType!, record.Topic(), record.Value(), record.Key(), throwUnmatch);
if (keyType == null || keySerializerType == null || valueType == null || valueSerializerType == null)
{
throw new InvalidOperationException($"Missing one, or more, mandatory information in record: keyType: {keyType} - keySerializerType: {keySerializerType} - valueType: {valueType} - valueSerializerType: {valueSerializerType}");
}

return FromRawValueData(keyType!, valueType!, keySerializerType!, valueSerializerType!, record.Topic(), record.Value(), record.Key(), throwUnmatch);
}

/// <summary>
Expand Down

0 comments on commit 7fb11b5

Please sign in to comment.