Skip to content

Commit

Permalink
Merge branch '327-improve-kafka-streams-with-some-specific-knet-class…
Browse files Browse the repository at this point in the history
…es' of https://github.com/masesdevelopers/KafkaBridge into 327-improve-kafka-streams-with-some-specific-knet-classes
  • Loading branch information
masesdevelopers committed Jan 5, 2024
2 parents a02165e + ead05b1 commit cadb313
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 26 deletions.
10 changes: 10 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ public KNetConsumer(Properties props, bool useJVMCallback = false)
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="configBuilder">An instance of <see cref="ConsumerConfigBuilder"/> </param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(ConsumerConfigBuilder configBuilder, bool useJVMCallback = false)
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>())
{
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="props">The properties to use, see <see cref="ConsumerConfig"/> and <see cref="ConsumerConfigBuilder"/></param>
/// <param name="keyDeserializer">Key serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="valueDeserializer">Value serializer base on <see cref="KNetSerDes{K}"/></param>
Expand Down
103 changes: 101 additions & 2 deletions src/net/KNet/Specific/GenericConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using Java.Util;
using System.Globalization;
using System;
using MASES.KNet.Serialization;
using System.Linq;

namespace MASES.KNet
{
Expand All @@ -44,7 +46,9 @@ public static T CreateFrom(T origin)
if (origin == null) return Create();
var newT = new T
{
_options = new System.Collections.Generic.Dictionary<string, object>(origin._options)
_options = new System.Collections.Generic.Dictionary<string, object>(origin._options),
_KNetKeySerDes = origin._KNetKeySerDes,
_KNetValueSerDes = origin._KNetValueSerDes,
};
return newT;
}
Expand Down Expand Up @@ -106,7 +110,8 @@ protected virtual T Clone()
var clone = new T
{
_options = new System.Collections.Generic.Dictionary<string, object>(_options)
}; return clone;
};
return clone;
}
/// <summary>
/// Returns the <see cref="Properties"/> from the <typeparamref name="T"/> instance
Expand Down Expand Up @@ -137,5 +142,99 @@ public Map<string, string> ToMap()

return props;
}
Type _KNetKeySerDes = null;
/// <summary>
/// The <see cref="Type"/> used to create an instance of <see cref="IKNetSerDes{T}"/> for keys with <see cref="BuildKeySerDes{TKey}"/>
/// </summary>
public Type KNetKeySerDes
{
get { return _KNetKeySerDes; }
set
{
if (value.GetConstructors().Single(ci => ci.GetParameters().Length == 0) == null)
{
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid Serializer type");
}

if (value.IsGenericType)
{
var keyT = value.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid Serializer type"); }
var t = value.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IKNetSerDes<>).Name) == null)
{
throw new ArgumentException($"{value.Name} does not implement IKNetSerDes<> and cannot be used because it is not a valid Serializer type");
}
_KNetKeySerDes = value;
}
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid ValueContainer type");
}
}

Type _KNetValueSerDes = null;
/// <summary>
/// The <see cref="Type"/> used to create an instance of <see cref="IKNetSerDes{T}"/> for values with <see cref="BuildValueSerDes{TValue}"/>
/// </summary>
public Type KNetValueSerDes
{
get { return _KNetValueSerDes; }
set
{
if (value.GetConstructors().Single(ci => ci.GetParameters().Length == 0) == null)
{
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid Serializer type");
}

if (value.IsGenericType)
{
var keyT = value.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid Serializer type"); }
var t = value.GetGenericTypeDefinition();
if (t.GetInterface(typeof(IKNetSerDes<>).Name) == null)
{
throw new ArgumentException($"{value.Name} does not implement IKNetSerDes<> and cannot be used because it is not a valid Serializer type");
}
_KNetValueSerDes = value;
}
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid Serializer type");
}
}

/// <summary>
/// Builds an instance of <see cref="IKNetSerDes{TKey}"/> using the <see cref="Type"/> defined in <see cref="KNetKeySerDes"/>
/// </summary>
/// <typeparam name="TKey">The type of the key</typeparam>
/// <returns>An instance of <see cref="IKNetSerDes{TKey}"/></returns>
/// <exception cref="InvalidOperationException">If <see cref="KNetKeySerDes"/> is <see langword="null"/></exception>
public IKNetSerDes<TKey> BuildKeySerDes<TKey>()
{
if (KNetSerialization.IsInternalManaged<TKey>())
{
return new KNetSerDes<TKey>();
}

if (KNetKeySerDes == null) throw new InvalidOperationException($"No default serializer available for {typeof(TKey)}, property {nameof(KNetKeySerDes)} shall be set.");
var tmp = KNetKeySerDes.MakeGenericType(typeof(TKey));
var o = Activator.CreateInstance(tmp);
return o as IKNetSerDes<TKey>;
}
/// <summary>
/// Builds an instance of <see cref="IKNetSerDes{TValue}"/> using the <see cref="Type"/> defined in <see cref="KNetValueSerDes"/>
/// </summary>
/// <typeparam name="TValue">The type of the key</typeparam>
/// <returns>An instance of <see cref="IKNetSerDes{TValue}"/></returns>
/// <exception cref="InvalidOperationException">If <see cref="KNetValueSerDes"/> is <see langword="null"/></exception>
public IKNetSerDes<TValue> BuildValueSerDes<TValue>()
{
if (KNetSerialization.IsInternalManaged<TValue>())
{
return new KNetSerDes<TValue>();
}

if (KNetValueSerDes == null) throw new InvalidOperationException($"No default serializer available for {typeof(TValue)}, property {nameof(KNetValueSerDes)} shall be set.");
var tmp = KNetValueSerDes.MakeGenericType(typeof(TValue));
var o = Activator.CreateInstance(tmp);
return o as IKNetSerDes<TValue>;
}
}
}
15 changes: 12 additions & 3 deletions src/net/KNet/Specific/Producer/KNetProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public class KNetProducer<K, V> : KafkaProducer<byte[], byte[]>, IKNetProducer<K
/// </summary>
public override string BridgeClassName => "org.mases.knet.clients.producer.KNetProducer";

readonly bool autoCreateSerDes = false;
readonly bool _autoCreateSerDes = false;
readonly IKNetSerializer<K> _keySerializer;
readonly IKNetSerializer<V> _valueSerializer;
/// <summary>
Expand All @@ -175,7 +175,16 @@ public class KNetProducer<K, V> : KafkaProducer<byte[], byte[]>, IKNetProducer<K
public KNetProducer(Properties props)
: this(props, new KNetSerDes<K>(), new KNetSerDes<V>())
{
autoCreateSerDes = true;
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetProducer{K, V}"/>
/// </summary>
/// <param name="configBuilder">An instance of <see cref="ProducerConfigBuilder"/> </param>
public KNetProducer(ProducerConfigBuilder configBuilder)
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>())
{
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetProducer{K, V}"/>
Expand Down Expand Up @@ -211,7 +220,7 @@ static Properties CheckProperties(Properties props)
/// </summary>
~KNetProducer()
{
if (autoCreateSerDes)
if (_autoCreateSerDes)
{
_keySerializer?.Dispose();
_valueSerializer?.Dispose();
Expand Down
Loading

0 comments on commit cadb313

Please sign in to comment.