diff --git a/src/net/KNet/Developed/Org/Apache/Kafka/Common/Serialization/Serde.cs b/src/net/KNet/Developed/Org/Apache/Kafka/Common/Serialization/Serde.cs index 50d260a0be..9ccd15e5a3 100644 --- a/src/net/KNet/Developed/Org/Apache/Kafka/Common/Serialization/Serde.cs +++ b/src/net/KNet/Developed/Org/Apache/Kafka/Common/Serialization/Serde.cs @@ -17,14 +17,15 @@ */ using MASES.JCOBridge.C2JBridge; +using MASES.JCOBridge.C2JBridge.JVMInterop; namespace Org.Apache.Kafka.Common.Serialization { - /// + /// /// Listener for Kafka Serializer. Extends . Implements /// /// Dispose the object to avoid a resource leak, the object contains a reference to the corresponding JVM object - public partial class Serde : ISerde + public partial class SerdeDirect : ISerde { /// /// @@ -32,7 +33,8 @@ public partial class Serde : ISerde /// public Org.Apache.Kafka.Common.Serialization.Deserializer DeserializerDirect() { - return new DeserializerDirect(); + var res = this.IExecute("deserializer") as IJavaObject; + return WrapsDirect>(res); } /// /// @@ -40,7 +42,8 @@ public Org.Apache.Kafka.Common.Serialization.Deserializer DeserializerDirect( /// public Org.Apache.Kafka.Common.Serialization.Serializer SerializerDirect() { - return new SerializerDirect(); + var res = this.IExecute("serializer") as IJavaObject; + return WrapsDirect>(res); } } } diff --git a/src/net/KNet/Specific/Serialization/SerDes.cs b/src/net/KNet/Specific/Serialization/SerDes.cs index 129df1889a..0f6752d987 100644 --- a/src/net/KNet/Specific/Serialization/SerDes.cs +++ b/src/net/KNet/Specific/Serialization/SerDes.cs @@ -114,81 +114,61 @@ public SerDes() throw new InvalidOperationException($"{_SerializationType} is not valid."); } + SerdeDirect kSerde = null; + if (IsDirectBuffered) { - _KafkaSerializer = new KNetByteBufferSerializer().CastDirect>(); - _KafkaDeserializer = new KNetByteBufferDeserializer().CastDirect>(); - _KafkaSerde = new KNetSerdes.ByteBufferSerde().CastDirect>(); + kSerde = new KNetSerdes.ByteBufferSerde().CastTo>(); } else { switch (_JVMSerializationType) { case KNetSerialization.SerializationType.Boolean: - _KafkaSerializer = new BooleanSerializer().CastDirect>(); - _KafkaDeserializer = new BooleanDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.BooleanSerde().CastDirect>(); + kSerde = new Serdes.BooleanSerde().CastTo>(); break; case KNetSerialization.SerializationType.ByteArray: - _KafkaSerializer = new ByteArraySerializer().CastDirect>(); - _KafkaDeserializer = new ByteArrayDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.ByteArraySerde().CastDirect>(); + kSerde = new Serdes.ByteArraySerde().CastTo>(); break; case KNetSerialization.SerializationType.ByteBuffer: - _KafkaSerializer = new ByteBufferSerializer().CastDirect>(); - _KafkaDeserializer = new ByteBufferDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.ByteBufferSerde().CastDirect>(); + kSerde = new Serdes.ByteBufferSerde().CastTo>(); break; case KNetSerialization.SerializationType.Bytes: - _KafkaSerializer = new BytesSerializer().CastDirect>(); - _KafkaDeserializer = new BytesDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.BytesSerde().CastDirect>(); + kSerde = new Serdes.BytesSerde().CastTo>(); break; case KNetSerialization.SerializationType.Double: - _KafkaSerializer = new DoubleSerializer().CastDirect>(); - _KafkaDeserializer = new DoubleDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.DoubleSerde().CastDirect>(); + kSerde = new Serdes.DoubleSerde().CastTo>(); break; case KNetSerialization.SerializationType.Float: - _KafkaSerializer = new FloatSerializer().CastDirect>(); - _KafkaDeserializer = new FloatDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.FloatSerde().CastDirect>(); + kSerde = new Serdes.FloatSerde().CastTo>(); break; case KNetSerialization.SerializationType.Integer: - _KafkaSerializer = new IntegerSerializer().CastDirect>(); - _KafkaDeserializer = new IntegerDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.IntegerSerde().CastDirect>(); + kSerde = new Serdes.IntegerSerde().CastTo>(); break; case KNetSerialization.SerializationType.Long: - _KafkaSerializer = new LongSerializer().CastDirect>(); - _KafkaDeserializer = new LongDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.LongSerde().CastDirect>(); + kSerde = new Serdes.LongSerde().CastTo>(); break; case KNetSerialization.SerializationType.Short: - _KafkaSerializer = new ShortSerializer().CastDirect>(); - _KafkaDeserializer = new ShortDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.ShortSerde().CastDirect>(); + kSerde = new Serdes.ShortSerde().CastTo>(); break; case KNetSerialization.SerializationType.String: - _KafkaSerializer = new StringSerializer().CastDirect>(); - _KafkaDeserializer = new StringDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.StringSerde().CastDirect>(); + kSerde = new Serdes.StringSerde().CastTo>(); break; case KNetSerialization.SerializationType.Guid: - _KafkaSerializer = new UUIDSerializer().CastDirect>(); - _KafkaDeserializer = new UUIDDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.UUIDSerde().CastDirect>(); + kSerde = new Serdes.UUIDSerde().CastTo>(); break; case KNetSerialization.SerializationType.Void: - _KafkaSerializer = new VoidSerializer().CastDirect>(); - _KafkaDeserializer = new VoidDeserializer().CastDirect>(); - _KafkaSerde = new Serdes.VoidSerde().CastDirect>(); + kSerde = new Serdes.VoidSerde().CastTo>(); break; case KNetSerialization.SerializationType.External: default: throw new InvalidOperationException($"{typeof(T)} needs an external serializer: set {nameof(OnSerialize)} or {nameof(OnSerializeWithHeaders)}."); } } + + _KafkaSerde = kSerde; + _KafkaSerializer = kSerde.SerializerDirect(); + _KafkaDeserializer = kSerde.DeserializerDirect(); } /// /// Finalizer