diff --git a/src/documentation/articles/serialization.md b/src/documentation/articles/serialization.md index c72f2bdf..6dc2fab6 100644 --- a/src/documentation/articles/serialization.md +++ b/src/documentation/articles/serialization.md @@ -175,69 +175,98 @@ The engine comes with two different encoders: The following schema is the default used from the engine and can be registered in Apache Schema registry so other applications can use it to extract the data stored in the topics: -```json -{ - "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", - "type": "record", - "name": "AvroValueContainer", - "doc": "Represents the storage container type to be used from KEFCore", - "fields": [ - { - "name": "EntityName", - "type": "string" - }, - { - "name": "ClrType", - "type": "string" - }, - { - "name": "Data", - "type": { - "type": "array", - "items": { - "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", - "type": "record", - "name": "PropertyDataRecord", - "doc": "Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore", - "fields": [ - { - "name": "PropertyIndex", - "type": "int" - }, - { - "name": "PropertyName", - "type": "string" - }, - { - "name": "ClrType", - "type": "string" - }, - { - "name": "Value", - "type": [ - "null", - "boolean", - "int", - "long", - "float", - "double", - "string" - ] - } - ] - } - } - } - ] -} -``` +- Complex Primary Key schema: + ```json + { + "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", + "type": "record", + "name": "AvroKeyContainer", + "doc": "Represents the storage container type to be used from KEFCore for keys", + "fields": [ + { + "name": "PrimaryKey", + "type": { + "type": "array", + "items": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "string" + ] + } + } + ] + } + ``` + + +- ValueContainer schema: + ```json + { + "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", + "type": "record", + "name": "AvroValueContainer", + "doc": "Represents the storage container type to be used from KEFCore", + "fields": [ + { + "name": "EntityName", + "type": "string" + }, + { + "name": "ClrType", + "type": "string" + }, + { + "name": "Data", + "type": { + "type": "array", + "items": { + "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", + "type": "record", + "name": "PropertyDataRecord", + "doc": "Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore", + "fields": [ + { + "name": "PropertyIndex", + "type": "int" + }, + { + "name": "PropertyName", + "type": "string" + }, + { + "name": "ClrType", + "type": "string" + }, + { + "name": "Value", + "type": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "string" + ] + } + ] + } + } + } + ] + } + ``` The extension converted this schema into code to speedup the exection of serialization/deserialization operations. ### How to use Avro `KafkaDbContext` contains three properties can be used to override the default types: -- **KeySerializationType**: Leave this value untouched, till now the engine uses the default serializer -- **ValueSerializationType**: set this value to `KEFCoreSerDesAvroBinary<>` or `KEFCoreSerDesAvroJson<>` +- **KeySerializationType**: set this value to `KEFCoreSerDesKeyAvroBinary<>` or `KEFCoreSerDesKeyAvroJson<>`, the type automatically fallback to default serializer for simple Primary Key +- **ValueSerializationType**: set this value to `KEFCoreSerDesValueContainerAvroBinary<>` or `KEFCoreSerDesValueContainerAvroJson<>` - **ValueContainerType**: set this value to `AvroValueContainer<>` An example is: @@ -248,8 +277,9 @@ using (context = new BloggingContext() BootstrapServers = "KAFKA-SERVER:9092", ApplicationId = "MyAppid", DbName = "MyDBName", + KeySerializationType = UseAvroBinary ? typeof(KEFCoreSerDesKeyAvroBinary<>) : typeof(KEFCoreSerDesKeyAvroJson<>), ValueContainerType = typeof(AvroValueContainer<>), - ValueSerializationType = UseAvroBinary ? typeof(KEFCoreSerDesAvroBinary<>) : typeof(KEFCoreSerDesAvroJson<>), + ValueSerializationType = UseAvroBinary ? typeof(KEFCoreSerDesValueContainerAvroBinary<>) : typeof(KEFCoreSerDesValueContainerAvroJson<>), }) { // execute stuff here diff --git a/src/net/KEFCore.SerDes.Avro.Compiler/AvroSerializationHelper.cs b/src/net/KEFCore.SerDes.Avro.Compiler/AvroSerializationHelper.cs index cee7033e..7da94a32 100644 --- a/src/net/KEFCore.SerDes.Avro.Compiler/AvroSerializationHelper.cs +++ b/src/net/KEFCore.SerDes.Avro.Compiler/AvroSerializationHelper.cs @@ -52,6 +52,7 @@ public static void BuildSchemaClassesFromFiles( string outputFolder, params stri public static void BuildDefaultSchema(string outputFolder) { BuildSchemaClassesFromFiles(outputFolder, "AvroValueContainer.avsc"); + BuildSchemaClassesFromFiles(outputFolder, "AvroKeyContainer.avsc"); } } diff --git a/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj b/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj index 5ccbfba8..b22edb3c 100644 --- a/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj +++ b/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj @@ -13,6 +13,14 @@ enable false + + + PreserveNewest + + + PreserveNewest + + diff --git a/src/net/KEFCore.SerDes.Avro/AvroKeyContainer.avsc b/src/net/KEFCore.SerDes.Avro/AvroKeyContainer.avsc new file mode 100644 index 00000000..2567cea1 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/AvroKeyContainer.avsc @@ -0,0 +1,23 @@ +{ + "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", + "type": "record", + "name": "AvroKeyContainer", + "doc": "Represents the storage container type to be used from KEFCore for keys", + "fields": [ + { + "name": "PrimaryKey", + "type": { + "type": "array", + "items": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "string" + ] + } + } + ] +} \ No newline at end of file diff --git a/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc b/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc index c90c9882..ed5b366e 100644 --- a/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc +++ b/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc @@ -2,7 +2,7 @@ "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", "type": "record", "name": "AvroValueContainer", - "doc": "Represents the storage container type to be used from KEFCore", + "doc": "Represents the storage container type to be used from KEFCore values", "fields": [ { "name": "EntityName", diff --git a/src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs b/src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs new file mode 100644 index 00000000..675ae0bf --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs @@ -0,0 +1,59 @@ +// ------------------------------------------------------------------------------ +// +// Generated by MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler, version 1.11.3 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage +{ + using System; + using System.Collections.Generic; + using System.Text; + using global::Avro; + using global::Avro.Specific; + + /// + /// Represents the storage container type to be used from KEFCore for keys + /// + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler", "1.11.3")] + public partial class AvroKeyContainer : global::Avro.Specific.ISpecificRecord + { + public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse(@"{""type"":""record"",""name"":""AvroKeyContainer"",""doc"":""Represents the storage container type to be used from KEFCore for keys"",""namespace"":""MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage"",""fields"":[{""name"":""PrimaryKey"",""type"":{""type"":""array"",""items"":[""null"",""boolean"",""int"",""long"",""float"",""double"",""string""]}}]}"); + private IList _PrimaryKey; + public virtual global::Avro.Schema Schema + { + get + { + return AvroKeyContainer._SCHEMA; + } + } + public IList PrimaryKey + { + get + { + return this._PrimaryKey; + } + set + { + this._PrimaryKey = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.PrimaryKey; + default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.PrimaryKey = (IList)fieldValue; break; + default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj b/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj index 0c4abe03..4f33e118 100644 --- a/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj +++ b/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj @@ -50,6 +50,9 @@ + + PreserveNewest + PreserveNewest diff --git a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroBinary.cs b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroBinary.cs new file mode 100644 index 00000000..d7c39df3 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroBinary.cs @@ -0,0 +1,84 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using Avro.IO; +using Avro.Specific; +using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; +using MASES.KNet.Serialization; +using Org.Apache.Kafka.Common.Header; + +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro; + +/// +/// Avro Key Binary encoder extension of , for example +/// +/// +public class KEFCoreSerDesKeyAvroBinary : KNetSerDes +{ + static readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA); + static readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA); + readonly IKNetSerDes _defaultSerDes = default; + /// + /// Default initializer + /// + public KEFCoreSerDesKeyAvroBinary() + { + if (KNetSerialization.IsInternalManaged()) + { + _defaultSerDes = new KNetSerDes(); + } + else if (!typeof(T).IsArray) + { + throw new InvalidOperationException($"KEFCoreSerDesKeyAvroBinary cannot manage {typeof(T).Name}, override or build a new serializaer"); + } + } + + /// + public override byte[] Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override byte[] SerializeWithHeaders(string topic, Headers headers, T data) + { + if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data); + + using MemoryStream memStream = new(); + BinaryEncoder encoder = new(memStream); + SpecificWriter.Write(data, encoder); + return memStream.ToArray(); + } + /// + public override T Deserialize(string topic, byte[] data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) + { + if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); + + using MemoryStream memStream = new(data); + BinaryDecoder decoder = new(memStream); + T t = (T)Activator.CreateInstance(typeof(T))!; + t = SpecificReader.Read(t!, decoder); + return t; + } +} diff --git a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroJson.cs b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroJson.cs new file mode 100644 index 00000000..ec172d05 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroJson.cs @@ -0,0 +1,85 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using Avro.IO; +using Avro.Specific; +using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; +using MASES.KNet.Serialization; +using Org.Apache.Kafka.Common.Header; + +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro; + +/// +/// Avro Key Json encoder extension of , for example +/// +/// +public class KEFCoreSerDesKeyAvroJson : KNetSerDes +{ + static readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA); + static readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA); + readonly IKNetSerDes _defaultSerDes = default; + /// + /// Default initializer + /// + public KEFCoreSerDesKeyAvroJson() + { + if (KNetSerialization.IsInternalManaged()) + { + _defaultSerDes = new KNetSerDes(); + } + else if (!typeof(T).IsArray) + { + throw new InvalidOperationException($"KEFCoreSerDesKeyAvroJson cannot manage {typeof(T).Name}, override or build a new serializaer"); + } + } + + /// + public override byte[] Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override byte[] SerializeWithHeaders(string topic, Headers headers, T data) + { + if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data); + + using MemoryStream memStream = new(); + JsonEncoder encoder = new(AvroKeyContainer._SCHEMA, memStream); + SpecificWriter.Write(data, encoder); + encoder.Flush(); + return memStream.ToArray(); + } + /// + public override T Deserialize(string topic, byte[] data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) + { + if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); + + using MemoryStream memStream = new(data); + JsonDecoder decoder = new(AvroKeyContainer._SCHEMA, memStream); + T t = (T)Activator.CreateInstance(typeof(T))!; + t = SpecificReader.Read(t!, decoder); + return t; + } +} diff --git a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroBinary.cs b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.ValueContainer.AvroBinary.cs similarity index 82% rename from src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroBinary.cs rename to src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.ValueContainer.AvroBinary.cs index 321241e9..267acc46 100644 --- a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroBinary.cs +++ b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.ValueContainer.AvroBinary.cs @@ -27,13 +27,13 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro; /// -/// Avro Binary encoder extension of , for example +/// Avro ValueContainer Binary encoder extension of , for example /// /// -public class KEFCoreSerDesAvroBinary : KNetSerDes +public class KEFCoreSerDesValueContainerAvroBinary : KNetSerDes { - static readonly SpecificDefaultWriter SpecificWriter = new SpecificDefaultWriter(AvroValueContainer._SCHEMA); - static readonly SpecificDefaultReader SpecificReader = new SpecificDefaultReader(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); + static readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA); + static readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); /// public override byte[] Serialize(string topic, T data) diff --git a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroJson.cs b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.ValueContainer.AvroJson.cs similarity index 82% rename from src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroJson.cs rename to src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.ValueContainer.AvroJson.cs index 4adf7853..e8971f27 100644 --- a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroJson.cs +++ b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.ValueContainer.AvroJson.cs @@ -27,13 +27,13 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro; /// -/// Avro Json encoder extension of , for example +/// Avro ValueContainer Json encoder extension of , for example /// /// -public class KEFCoreSerDesAvroJson : KNetSerDes +public class KEFCoreSerDesValueContainerAvroJson : KNetSerDes { - static readonly SpecificDefaultWriter SpecificWriter = new SpecificDefaultWriter(AvroValueContainer._SCHEMA); - static readonly SpecificDefaultReader SpecificReader = new SpecificDefaultReader(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); + static readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA); + static readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); /// public override byte[] Serialize(string topic, T data) diff --git a/test/KEFCore.Benchmark.Avro.Test/Program.cs b/test/KEFCore.Benchmark.Avro.Test/Program.cs index 940ee96c..ad1402c5 100644 --- a/test/KEFCore.Benchmark.Avro.Test/Program.cs +++ b/test/KEFCore.Benchmark.Avro.Test/Program.cs @@ -88,8 +88,9 @@ static void Main(string[] args) ApplicationId = config.ApplicationId, DbName = databaseName, StreamsConfig = streamConfig, + KeySerializationType = config.UseAvroBinary ? typeof(KEFCoreSerDesKeyAvroBinary<>) : typeof(KEFCoreSerDesKeyAvroJson<>), ValueContainerType = typeof(AvroValueContainer<>), - ValueSerializationType = config.UseAvroBinary ? typeof(KEFCoreSerDesAvroBinary<>) : typeof(KEFCoreSerDesAvroJson<>), + ValueSerializationType = config.UseAvroBinary ? typeof(KEFCoreSerDesValueContainerAvroBinary<>) : typeof(KEFCoreSerDesValueContainerAvroJson<>), }) {