Skip to content

Commit

Permalink
Added Apache Kafka Streams based on KNet Streams SDK (#195)
Browse files Browse the repository at this point in the history
* Added ability to use KNet Streams SDK upgrading to KNet 2.4.1

* Update test program

* Update JSON serializer to improve the performances

* Added IAsyncEnumerable over KNetStreamsRetriever
  • Loading branch information
masesdevelopers authored Jan 21, 2024
1 parent 4c4c3d7 commit d40edfe
Show file tree
Hide file tree
Showing 28 changed files with 1,037 additions and 120 deletions.
2 changes: 2 additions & 0 deletions src/documentation/articles/kafkadbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ _description: Describe what is and how use KafkaDbContext class from Entity Fram
- **UsePersistentStorage**: set to **true** to use a persistent storage between multiple application startup
- **UseDeletePolicyForTopic**: set to **true** to enable [delete cleanup policy](https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy)
- **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics
- **UseKNetStreams**: Use KNet version of Apache Kafka Streams instead of standard Apache Kafka Streams, used if **UseCompactedReplicator** is **false**
- **UseEnumeratorWithPrefetch**: Setting this property to **true** the engine prefers to use enumerator instances able to do a prefetch on data speeding up execution, used if **UseKNetStreams** is **true** and **UseCompactedReplicator** is **false**
- **ConsumerConfig**: parameters to use for Producer
- **ProducerConfig**: parameters to use for Producer
- **StreamsConfig**: parameters to use for Apche Kafka Streams application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.3.0" />
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.4.1" />
</ItemGroup>

<ItemGroup>
Expand Down
21 changes: 17 additions & 4 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Header;
using System.Text;
using System.Text.Json;

namespace MASES.EntityFrameworkCore.KNet.Serialization.Json;
/// <summary>
Expand Down Expand Up @@ -55,6 +56,7 @@ public class Json<T> : KNetSerDes<T>
readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly IKNetSerDes<T> _defaultSerDes = default!;
readonly JsonSerializerOptions? _options = null;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
Expand All @@ -70,6 +72,13 @@ public Json()
{
throw new InvalidOperationException($"{typeof(Json<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer");
}
else
{
_options = new JsonSerializerOptions()
{
WriteIndented = false,
};
}
}

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
Expand All @@ -84,7 +93,6 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
Expand All @@ -99,7 +107,7 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d
if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data);

if (data == null) return default!;
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
return System.Text.Json.JsonSerializer.Deserialize<T>(data, _options)!;
}
}
}
Expand All @@ -117,6 +125,7 @@ public class Json<T> : KNetSerDes<T>
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified());
readonly byte[] valueContainerName = null!;
readonly System.Text.Json.JsonSerializerOptions _options;
/// <inheritdoc/>
public override bool UseHeaders => true;
/// <summary>
Expand All @@ -133,6 +142,10 @@ public Json()
if (t.GetInterface(typeof(IValueContainer<>).Name) != null)
{
valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified());
_options = new System.Text.Json.JsonSerializerOptions(System.Text.Json.JsonSerializerDefaults.General)
{
WriteIndented = false,
};
return;
}
else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type");
Expand All @@ -151,7 +164,7 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data, _options);
return Encoding.UTF8.GetBytes(jsonStr);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
Expand All @@ -163,7 +176,7 @@ public override T Deserialize(string topic, byte[] data)
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default!;
return System.Text.Json.JsonSerializer.Deserialize<T>(data)!;
return System.Text.Json.JsonSerializer.Deserialize<T>(data, _options)!;
}
}
}
Expand Down
223 changes: 182 additions & 41 deletions src/net/KEFCore.SerDes/DefaultValueContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,94 @@

#nullable enable

using System.Collections.Concurrent;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage;
/// <summary>
/// This is a supporting class used from <see cref="DefaultValueContainer{TKey}"/>
/// </summary>
[JsonSerializable(typeof(PropertyData))]
public class PropertyData : IJsonOnDeserialized
{
static readonly ConcurrentDictionary<Type, ManagedTypes> dict = new();
static readonly ConcurrentDictionary<ManagedTypes, Type> reverseDict = new();
readonly static Type StringType = typeof(string);
readonly static Type GuidType = typeof(Guid);
readonly static Type DateTimeType = typeof(DateTime);
readonly static Type DateTimeOffsetType = typeof(DateTimeOffset);
readonly static Type ByteType = typeof(byte);
readonly static Type ShortType = typeof(short);
readonly static Type IntType = typeof(int);
readonly static Type LongType = typeof(long);
readonly static Type DoubleType = typeof(double);
readonly static Type FloatType = typeof(float);

/// <summary>
/// List of <see cref="Type"/> managed from <see cref="PropertyData"/>
/// </summary>
public enum ManagedTypes
{
/// <summary>
/// Not defined or not found
/// </summary>
Undefined,
/// <summary>
/// <see cref="string"/>
/// </summary>
String,
/// <summary>
/// <see cref="Guid"/>
/// </summary>
Guid,
/// <summary>
/// <see cref="DateTime"/>
/// </summary>
DateTime,
/// <summary>
/// <see cref="DateTimeOffset"/>
/// </summary>
DateTimeOffset,
/// <summary>
/// <see cref="byte"/>
/// </summary>
Byte,
/// <summary>
/// <see cref="short"/>
/// </summary>
Short,
/// <summary>
/// <see cref="int"/>
/// </summary>
Int,
/// <summary>
/// <see cref="long"/>
/// </summary>
Long,
/// <summary>
/// <see cref="double"/>
/// </summary>
Double,
/// <summary>
/// <see cref="float"/>
/// </summary>
Float
}

static PropertyData()
{
dict.TryAdd(StringType, ManagedTypes.String);reverseDict.TryAdd(ManagedTypes.String, StringType);
dict.TryAdd(GuidType, ManagedTypes.Guid); reverseDict.TryAdd(ManagedTypes.Guid, GuidType);
dict.TryAdd(DateTimeType, ManagedTypes.DateTime); reverseDict.TryAdd(ManagedTypes.DateTime, DateTimeType);
dict.TryAdd(DateTimeOffsetType, ManagedTypes.DateTimeOffset); reverseDict.TryAdd(ManagedTypes.DateTimeOffset, DateTimeOffsetType);
dict.TryAdd(ByteType, ManagedTypes.Byte); reverseDict.TryAdd(ManagedTypes.Byte, ByteType);
dict.TryAdd(ShortType, ManagedTypes.Short); reverseDict.TryAdd(ManagedTypes.Short, ShortType);
dict.TryAdd(IntType, ManagedTypes.Int); reverseDict.TryAdd(ManagedTypes.Int, IntType);
dict.TryAdd(LongType, ManagedTypes.Long); reverseDict.TryAdd(ManagedTypes.Long, LongType);
dict.TryAdd(DoubleType, ManagedTypes.Double); reverseDict.TryAdd(ManagedTypes.Double, DoubleType);
dict.TryAdd(FloatType, ManagedTypes.Float); reverseDict.TryAdd(ManagedTypes.Float, FloatType);
}

/// <summary>
/// Initialize a new instance of <see cref="PropertyData"/>
/// </summary>
Expand All @@ -46,6 +124,8 @@ public PropertyData()
/// <remarks>This constructor is mandatory and it is used from <see cref="DefaultValueContainer{TKey}"/></remarks>
public PropertyData(IProperty property, object value)
{
if (!dict.TryGetValue(property.ClrType, out ManagedTypes _type)) _type = ManagedTypes.Undefined;
ManagedType = _type;
ClrType = property.ClrType?.FullName;
PropertyName = property.Name;
Value = value;
Expand All @@ -55,56 +135,114 @@ public void OnDeserialized()
{
if (Value is JsonElement elem)
{
switch (elem.ValueKind)
if (ManagedType == null || ManagedType == ManagedTypes.Undefined)
{
case JsonValueKind.String:
Value = elem.GetString()!;
if (ClrType != typeof(string).FullName)
{
try
switch (elem.ValueKind)
{
case JsonValueKind.String:
Value = elem.GetString()!;
if (ClrType != typeof(string).FullName)
{
Value = Convert.ChangeType(Value, Type.GetType(ClrType!)!);
}
catch (InvalidCastException)
{
// failed conversion, try with other methods for known types
if (ClrType == typeof(Guid).FullName)
try
{
Value = elem.GetGuid();
Value = Convert.ChangeType(Value, Type.GetType(ClrType!)!);
}
else if (ClrType == typeof(DateTime).FullName)
catch (InvalidCastException)
{
Value = elem.GetDateTime();
// failed conversion, try with other methods for known types
if (ClrType == typeof(Guid).FullName)
{
Value = elem.GetGuid();
}
else if (ClrType == typeof(DateTime).FullName)
{
Value = elem.GetDateTime();
}
else if (ClrType == typeof(DateTimeOffset).FullName)
{
Value = elem.GetDateTimeOffset();
}
else
{
Value = elem.GetString()!;
}
}
else if (ClrType == typeof(DateTimeOffset).FullName)
}
break;
case JsonValueKind.Number:
var tmp = elem.GetInt64();
Value = Convert.ChangeType(tmp, Type.GetType(ClrType!)!);
break;
case JsonValueKind.True:
Value = true;
break;
case JsonValueKind.False:
Value = false;
break;
case JsonValueKind.Null:
Value = null;
break;
case JsonValueKind.Object:
case JsonValueKind.Array:
case JsonValueKind.Undefined:
default:
throw new InvalidOperationException($"Failed to deserialize {PropertyName}, ValueKind is {elem.ValueKind}");
}
}
else
{
switch (elem.ValueKind)
{
case JsonValueKind.String:
Value = elem.GetString()!;
if (ManagedType != ManagedTypes.String)
{
try
{
Value = elem.GetDateTimeOffset();

Value = Convert.ChangeType(Value, reverseDict[ManagedType.Value]);
}
else
catch (InvalidCastException)
{
Value = elem.GetString()!;
// failed conversion, try with other methods for known types
if (ManagedType == ManagedTypes.Guid)
{
Value = elem.GetGuid();
}
else if (ManagedType == ManagedTypes.DateTime)
{
Value = elem.GetDateTime();
}
else if (ManagedType == ManagedTypes.DateTimeOffset)
{
Value = elem.GetDateTimeOffset();
}
else
{
Value = elem.GetString()!;
}
}
}
}
break;
case JsonValueKind.Number:
var tmp = elem.GetInt64();
Value = Convert.ChangeType(tmp, Type.GetType(ClrType!)!);
break;
case JsonValueKind.True:
Value = true;
break;
case JsonValueKind.False:
Value = false;
break;
case JsonValueKind.Null:
Value = null;
break;
case JsonValueKind.Object:
case JsonValueKind.Array:
case JsonValueKind.Undefined:
default:
throw new InvalidOperationException($"Failed to deserialize {PropertyName}, ValueKind is {elem.ValueKind}");
break;
case JsonValueKind.Number:
var tmp = elem.GetInt64();
Value = Convert.ChangeType(tmp, reverseDict[ManagedType.Value]);
break;
case JsonValueKind.True:
Value = true;
break;
case JsonValueKind.False:
Value = false;
break;
case JsonValueKind.Null:
Value = null;
break;
case JsonValueKind.Object:
case JsonValueKind.Array:
case JsonValueKind.Undefined:
default:
throw new InvalidOperationException($"Failed to deserialize {PropertyName}, ValueKind is {elem.ValueKind}");
}
}
}
else
Expand All @@ -117,6 +255,10 @@ public void OnDeserialized()
/// </summary>
public string? PropertyName { get; set; }
/// <summary>
/// The <see cref="ManagedTypes"/> value of the <see cref="ClrType"/>
/// </summary>
public ManagedTypes? ManagedType { get; set; }
/// <summary>
/// The full name of the CLR <see cref="Type"/> of the <see cref="IProperty"/>
/// </summary>
public string? ClrType { get; set; }
Expand All @@ -129,7 +271,6 @@ public void OnDeserialized()
/// The default ValueContainer used from KEFCore
/// </summary>
/// <typeparam name="TKey">It is the key <see cref="Type"/> passed from Entity Framework associated to the Entity data will be stored in the <see cref="DefaultValueContainer{TKey}"/></typeparam>
[JsonSerializable(typeof(DefaultValueContainer<>))]
public class DefaultValueContainer<TKey> : IValueContainer<TKey> where TKey : notnull
{
/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/net/KEFCore.SerDes/KEFCore.SerDes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.26" PrivateAssets="none" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.15" PrivateAssets="none" Condition="'$(TargetFramework)' == 'net7.0'" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.1" PrivateAssets="none" Condition="'$(TargetFramework)' == 'net8.0'" />
<PackageReference Include="MASES.KNet" Version="2.3.0">
<PackageReference Include="MASES.KNet" Version="2.4.1">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
Expand Down
Loading

0 comments on commit d40edfe

Please sign in to comment.