From a5d3ffb8921e8191879180882c6144ae769d3272 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Sat, 14 Oct 2023 16:11:08 +0200 Subject: [PATCH] Added first serialization based on Avro Binary codecs --- .../AvroSerializationHelper.cs | 57 ++++ .../AvroValueContainer.avsc | 54 ++++ .../KEFCore.SerDes.Avro/AvroValueContainer.cs | 99 +++++++ .../Generated/AvroValueContainer.cs | 87 ++++++ .../Generated/PropertyDataRecord.cs | 101 +++++++ .../KEFCore.SerDes.Avro.Compiler.csproj | 34 +++ .../KEFCore.SerDes.Avro.csproj | 61 ++++ .../KEFCore.SerDes.Avro/KEFCoreSerDes.Avro.cs | 65 +++++ src/net/KEFCore.SerDes.Avro/Program.cs | 54 ++++ .../KEFCore.SerDes/DefaultValueContainer.cs | 2 +- src/net/KEFCore.SerDes/KEFCore.SerDes.csproj | 2 +- src/net/KEFCore.SerDes/KEFCoreSerDes.cs | 8 +- src/net/KEFCore.sln | 12 + .../Internal/KafkaOptionsExtension.cs | 2 +- .../KEFCore/Infrastructure/KafkaDbContext.cs | 2 +- src/net/KEFCore/KEFCore.csproj | 2 +- .../Benchmark.Avro.Test.InMemory.json | 4 + .../Benchmark.Avro.Test.KNetReplicator.json | 5 + .../Benchmark.Avro.Test.KafkaStreams.json | 4 + .../KEFCore.Benchmark.Avro.Test.csproj | 33 +++ test/KEFCore.Benchmark.Avro.Test/Program.cs | 272 ++++++++++++++++++ test/KEFCore.Test.sln | 21 ++ 22 files changed, 972 insertions(+), 9 deletions(-) create mode 100644 src/net/KEFCore.SerDes.Avro/AvroSerializationHelper.cs create mode 100644 src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc create mode 100644 src/net/KEFCore.SerDes.Avro/AvroValueContainer.cs create mode 100644 src/net/KEFCore.SerDes.Avro/Generated/AvroValueContainer.cs create mode 100644 src/net/KEFCore.SerDes.Avro/Generated/PropertyDataRecord.cs create mode 100644 src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.Compiler.csproj create mode 100644 src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj create mode 100644 src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.Avro.cs create mode 100644 src/net/KEFCore.SerDes.Avro/Program.cs create mode 100644 test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.InMemory.json create mode 100644 test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KNetReplicator.json create mode 100644 test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KafkaStreams.json create mode 100644 test/KEFCore.Benchmark.Avro.Test/KEFCore.Benchmark.Avro.Test.csproj create mode 100644 test/KEFCore.Benchmark.Avro.Test/Program.cs diff --git a/src/net/KEFCore.SerDes.Avro/AvroSerializationHelper.cs b/src/net/KEFCore.SerDes.Avro/AvroSerializationHelper.cs new file mode 100644 index 00000000..cee7033e --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/AvroSerializationHelper.cs @@ -0,0 +1,57 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +// #define DEBUG_PERFORMANCE + +#nullable enable + +using Avro; + +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler; + +public static class AvroSerializationHelper +{ + public static void BuildSchemaClasses( string outputFolder, params string[] schemas) + { + var codegen = new CodeGen(); + foreach (var schema in schemas) + { + codegen.AddSchema(schema); + } + codegen.GenerateCode(); + codegen.WriteTypes(outputFolder, true); + } + + public static void BuildSchemaClassesFromFiles( string outputFolder, params string[] schemaFiles) + { + var codegen = new CodeGen(); + foreach (var schemaFile in schemaFiles) + { + var schema = File.ReadAllText(schemaFile); + codegen.AddSchema(schema); + } + codegen.GenerateCode(); + codegen.WriteTypes(outputFolder, true); + } + + public static void BuildDefaultSchema(string outputFolder) + { + BuildSchemaClassesFromFiles(outputFolder, "AvroValueContainer.avsc"); + } +} + diff --git a/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc b/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc new file mode 100644 index 00000000..c90c9882 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc @@ -0,0 +1,54 @@ +{ + "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", + "type": "record", + "name": "AvroValueContainer", + "doc": "Represents the storage container type to be used from KEFCore", + "fields": [ + { + "name": "EntityName", + "type": "string" + }, + { + "name": "ClrType", + "type": "string" + }, + { + "name": "Data", + "type": { + "type": "array", + "items": { + "namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage", + "type": "record", + "name": "PropertyDataRecord", + "doc": "Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore", + "fields": [ + { + "name": "PropertyIndex", + "type": "int" + }, + { + "name": "PropertyName", + "type": "string" + }, + { + "name": "ClrType", + "type": "string" + }, + { + "name": "Value", + "type": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "string" + ] + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/src/net/KEFCore.SerDes.Avro/AvroValueContainer.cs b/src/net/KEFCore.SerDes.Avro/AvroValueContainer.cs new file mode 100644 index 00000000..3462460e --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/AvroValueContainer.cs @@ -0,0 +1,99 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +// #define DEBUG_PERFORMANCE + +#nullable enable + +using Avro; + +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; + +/// +/// The default ValueContainer used from KEFCore +/// +/// It is the key passed from Entity Framework associated to the Entity data will be stored in the +public partial class AvroValueContainer : AvroValueContainer, IValueContainer where TKey : notnull +{ + /// + /// Initialize a new instance of + /// + /// It is mainly used from the JSON serializer + public AvroValueContainer() { } + /// + /// Initialize a new instance of + /// + /// The requesting the for + /// The data, built from EFCore, to be stored in the + /// This constructor is mandatory and it is used from KEFCore to request a + public AvroValueContainer(IEntityType tName, object[] rData) + { + EntityName = tName.Name; + ClrType = tName.ClrType.FullName!; + Data = new List(); + foreach (var item in tName.GetProperties()) + { + int index = item.GetIndex(); + var pRecord = new PropertyDataRecord + { + PropertyIndex = index, + PropertyName = item.Name, + ClrType = item.ClrType?.FullName, + Value = rData[index] + }; + Data.Add(pRecord); + } + } + /// + public void GetData(IEntityType tName, ref object[] array) + { +#if DEBUG_PERFORMANCE + Stopwatch fullSw = new Stopwatch(); + Stopwatch newSw = new Stopwatch(); + Stopwatch iterationSw = new Stopwatch(); + try + { + fullSw.Start(); +#endif + if (Data == null) { return; } +#if DEBUG_PERFORMANCE + newSw.Start(); +#endif + array = new object[Data.Count]; +#if DEBUG_PERFORMANCE + newSw.Stop(); + iterationSw.Start(); +#endif + for (int i = 0; i < Data.Count; i++) + { + array[i] = Data[i].Value!; + } +#if DEBUG_PERFORMANCE + iterationSw.Stop(); + fullSw.Stop(); + } + finally + { + if (Infrastructure.KafkaDbContext.TraceEntityTypeDataStorageGetData) + { + Infrastructure.KafkaDbContext.ReportString($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}"); + } + } +#endif + } +} diff --git a/src/net/KEFCore.SerDes.Avro/Generated/AvroValueContainer.cs b/src/net/KEFCore.SerDes.Avro/Generated/AvroValueContainer.cs new file mode 100644 index 00000000..46ddbe80 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/Generated/AvroValueContainer.cs @@ -0,0 +1,87 @@ +// ------------------------------------------------------------------------------ +// +// Generated by MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler, version 1.11.3 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage +{ + using System; + using System.Collections.Generic; + using System.Text; + using global::Avro; + using global::Avro.Specific; + + /// + /// Represents the storage container type to be used from KEFCore + /// + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler", "1.11.3")] + public partial class AvroValueContainer : global::Avro.Specific.ISpecificRecord + { + public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse(@"{""type"":""record"",""name"":""AvroValueContainer"",""doc"":""Represents the storage container type to be used from KEFCore"",""namespace"":""MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage"",""fields"":[{""name"":""EntityName"",""type"":""string""},{""name"":""ClrType"",""type"":""string""},{""name"":""Data"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""PropertyDataRecord"",""doc"":""Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore"",""namespace"":""MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage"",""fields"":[{""name"":""PropertyIndex"",""type"":""int""},{""name"":""PropertyName"",""type"":""string""},{""name"":""ClrType"",""type"":""string""},{""name"":""Value"",""type"":[""null"",""boolean"",""int"",""long"",""float"",""double"",""string""]}]}}}]}"); + private string _EntityName; + private string _ClrType; + private IList _Data; + public virtual global::Avro.Schema Schema + { + get + { + return AvroValueContainer._SCHEMA; + } + } + public string EntityName + { + get + { + return this._EntityName; + } + set + { + this._EntityName = value; + } + } + public string ClrType + { + get + { + return this._ClrType; + } + set + { + this._ClrType = value; + } + } + public IList Data + { + get + { + return this._Data; + } + set + { + this._Data = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.EntityName; + case 1: return this.ClrType; + case 2: return this.Data; + default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.EntityName = (System.String)fieldValue; break; + case 1: this.ClrType = (System.String)fieldValue; break; + case 2: this.Data = (IList)fieldValue; break; + default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/src/net/KEFCore.SerDes.Avro/Generated/PropertyDataRecord.cs b/src/net/KEFCore.SerDes.Avro/Generated/PropertyDataRecord.cs new file mode 100644 index 00000000..965f8a02 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/Generated/PropertyDataRecord.cs @@ -0,0 +1,101 @@ +// ------------------------------------------------------------------------------ +// +// Generated by MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler, version 1.11.3 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage +{ + using System; + using System.Collections.Generic; + using System.Text; + using global::Avro; + using global::Avro.Specific; + + /// + /// Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore + /// + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler", "1.11.3")] + public partial class PropertyDataRecord : global::Avro.Specific.ISpecificRecord + { + public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse(@"{""type"":""record"",""name"":""PropertyDataRecord"",""doc"":""Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore"",""namespace"":""MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage"",""fields"":[{""name"":""PropertyIndex"",""type"":""int""},{""name"":""PropertyName"",""type"":""string""},{""name"":""ClrType"",""type"":""string""},{""name"":""Value"",""type"":[""null"",""boolean"",""int"",""long"",""float"",""double"",""string""]}]}"); + private int _PropertyIndex; + private string _PropertyName; + private string _ClrType; + private object _Value; + public virtual global::Avro.Schema Schema + { + get + { + return PropertyDataRecord._SCHEMA; + } + } + public int PropertyIndex + { + get + { + return this._PropertyIndex; + } + set + { + this._PropertyIndex = value; + } + } + public string PropertyName + { + get + { + return this._PropertyName; + } + set + { + this._PropertyName = value; + } + } + public string ClrType + { + get + { + return this._ClrType; + } + set + { + this._ClrType = value; + } + } + public object Value + { + get + { + return this._Value; + } + set + { + this._Value = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.PropertyIndex; + case 1: return this.PropertyName; + case 2: return this.ClrType; + case 3: return this.Value; + default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.PropertyIndex = (System.Int32)fieldValue; break; + case 1: this.PropertyName = (System.String)fieldValue; break; + case 2: this.ClrType = (System.String)fieldValue; break; + case 3: this.Value = (System.Object)fieldValue; break; + default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.Compiler.csproj b/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.Compiler.csproj new file mode 100644 index 00000000..a25c1610 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.Compiler.csproj @@ -0,0 +1,34 @@ + + + + Exe + true + MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler + MASES.EntityFrameworkCore.KNet.Serialization.Avro + EntityFrameworkCore KNet - Avro Serialization support for EntityFrameworkCore provider for Apache Kafka + EntityFrameworkCore KNet - Avro Serialization support for EntityFrameworkCore provider for Apache Kafka + MASES.EntityFrameworkCore.KNet.Serialization.Avro + ..\..\..\bin\ + Entity Framework Core;entity-framework-core;ef;efcore;orm;avro;sql kafka apache-kafka dotnet clr netcore net5 net6 kafka connect streams producer consumer providers streamprovider confluent + enable + false + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + PreserveNewest + + + diff --git a/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj b/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj new file mode 100644 index 00000000..47ddca73 --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj @@ -0,0 +1,61 @@ + + + + true + MASES.EntityFrameworkCore.KNet.Serialization.Avro + MASES.EntityFrameworkCore.KNet.Serialization.Avro + EntityFrameworkCore KNet - Avro Serialization support for EntityFrameworkCore provider for Apache Kafka + EntityFrameworkCore KNet - Avro Serialization support for EntityFrameworkCore provider for Apache Kafka + MASES.EntityFrameworkCore.KNet.Serialization.Avro + ..\..\..\bin\ + Entity Framework Core;entity-framework-core;ef;efcore;orm;avro;sql kafka apache-kafka dotnet clr netcore net5 net6 kafka connect streams producer consumer providers streamprovider confluent + MASES.EntityFrameworkCore.KNet.Serialization.Avro + serialization.md + enable + true + True + False + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + PreserveNewest + + + diff --git a/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.Avro.cs b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.Avro.cs new file mode 100644 index 00000000..3beb840a --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.Avro.cs @@ -0,0 +1,65 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using Avro.IO; +using Avro.Specific; +using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; +using MASES.KNet.Serialization; +using Org.Apache.Kafka.Common.Header; + +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro; + +/// +/// Avro extension of , for example +/// +/// +public class KEFCoreSerDesAvro : KNetSerDes +{ + static SpecificDefaultWriter SpecificWriter = new SpecificDefaultWriter(AvroValueContainer._SCHEMA); + static SpecificDefaultReader SpecificReader = new SpecificDefaultReader(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); + + /// + public override byte[] Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override byte[] SerializeWithHeaders(string topic, Headers headers, T data) + { + using MemoryStream memStream = new(); + BinaryEncoder encoder = new(memStream); + SpecificWriter.Write(data, encoder); + return memStream.ToArray(); + } + /// + public override T Deserialize(string topic, byte[] data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) + { + using MemoryStream memStream = new(data); + BinaryDecoder decoder = new(memStream); + T t = (T)Activator.CreateInstance(typeof(T))!; + t = SpecificReader.Read(t!, decoder); + return t; + } +} diff --git a/src/net/KEFCore.SerDes.Avro/Program.cs b/src/net/KEFCore.SerDes.Avro/Program.cs new file mode 100644 index 00000000..c71afd7b --- /dev/null +++ b/src/net/KEFCore.SerDes.Avro/Program.cs @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2022 MASES s.r.l. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +using System.Diagnostics; + +namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler; + +partial class Program +{ + static void ReportString(string message) + { + if (Debugger.IsAttached) + { + Trace.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}"); + } + else + { + Console.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}"); + } + } + + static void Main(string[] args) + { + try + { + AvroSerializationHelper.BuildDefaultSchema("Generated"); + } + catch (Exception ex) + { + ReportString(ex.ToString()); + } + } +} diff --git a/src/net/KEFCore.SerDes/DefaultValueContainer.cs b/src/net/KEFCore.SerDes/DefaultValueContainer.cs index 8ef253bd..c8b3b00a 100644 --- a/src/net/KEFCore.SerDes/DefaultValueContainer.cs +++ b/src/net/KEFCore.SerDes/DefaultValueContainer.cs @@ -23,7 +23,7 @@ using System.Text.Json; using System.Text.Json.Serialization; -namespace MASES.EntityFrameworkCore.KNet.Serialization.Storage; +namespace MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; /// /// This is a supporting class used from /// diff --git a/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj b/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj index b2dbe71d..bd045c26 100644 --- a/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj +++ b/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj @@ -8,7 +8,7 @@ EntityFrameworkCore KNet - Serialization support for EntityFrameworkCore provider for Apache Kafka MASES.EntityFrameworkCore.KNet.Serialization ..\..\..\bin\ - Entity Framework Core;entity-framework-core;ef;efcore;orm;sql kafka apache-kafka dotnet clr netcore net5 net6 kafka connect streams producer consumer providers streamprovider confluent + Entity Framework Core;entity-framework-core;ef;efcore;orm;json;sql kafka apache-kafka dotnet clr netcore net6 kafka connect streams producer consumer providers streamprovider confluent MASES.EntityFrameworkCore.KNet.Serialization serialization.md enable diff --git a/src/net/KEFCore.SerDes/KEFCoreSerDes.cs b/src/net/KEFCore.SerDes/KEFCoreSerDes.cs index 8a2f1553..c16906cc 100644 --- a/src/net/KEFCore.SerDes/KEFCoreSerDes.cs +++ b/src/net/KEFCore.SerDes/KEFCoreSerDes.cs @@ -18,7 +18,7 @@ #nullable enable -using MASES.EntityFrameworkCore.KNet.Serialization.Storage; +using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; using MASES.KNet.Serialization; using Org.Apache.Kafka.Common.Header; using System.Text; @@ -34,7 +34,7 @@ public class KEFCoreSerDes : KNetSerDes /// public override byte[] Serialize(string topic, T data) { - return SerializeWithHeaders(topic, null, data); + return SerializeWithHeaders(topic, null!, data); } /// public override byte[] SerializeWithHeaders(string topic, Headers headers, T data) @@ -45,12 +45,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat /// public override T Deserialize(string topic, byte[] data) { - return DeserializeWithHeaders(topic, null, data); + return DeserializeWithHeaders(topic, null!, data); } /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { - if (data == null) return default; + if (data == null) return default!; return System.Text.Json.JsonSerializer.Deserialize(data)!; } } diff --git a/src/net/KEFCore.sln b/src/net/KEFCore.sln index adec0c99..5398ebde 100644 --- a/src/net/KEFCore.sln +++ b/src/net/KEFCore.sln @@ -7,6 +7,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore", "KEFCore\KEFCore. EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.SerDes", "KEFCore.SerDes\KEFCore.SerDes.csproj", "{FFA73185-E980-48D9-8016-BC5434A4E1B4}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.SerDes.Avro", "KEFCore.SerDes.Avro\KEFCore.SerDes.Avro.csproj", "{87EB9CF6-D275-460F-B3F5-A5DED0E40196}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.SerDes.Avro.Compiler", "KEFCore.SerDes.Avro\KEFCore.SerDes.Avro.Compiler.csproj", "{19F6B000-2C24-4431-8F39-DBF79BFD98F5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,6 +25,14 @@ Global {FFA73185-E980-48D9-8016-BC5434A4E1B4}.Debug|Any CPU.Build.0 = Debug|Any CPU {FFA73185-E980-48D9-8016-BC5434A4E1B4}.Release|Any CPU.ActiveCfg = Release|Any CPU {FFA73185-E980-48D9-8016-BC5434A4E1B4}.Release|Any CPU.Build.0 = Release|Any CPU + {87EB9CF6-D275-460F-B3F5-A5DED0E40196}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {87EB9CF6-D275-460F-B3F5-A5DED0E40196}.Debug|Any CPU.Build.0 = Debug|Any CPU + {87EB9CF6-D275-460F-B3F5-A5DED0E40196}.Release|Any CPU.ActiveCfg = Release|Any CPU + {87EB9CF6-D275-460F-B3F5-A5DED0E40196}.Release|Any CPU.Build.0 = Release|Any CPU + {19F6B000-2C24-4431-8F39-DBF79BFD98F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {19F6B000-2C24-4431-8F39-DBF79BFD98F5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {19F6B000-2C24-4431-8F39-DBF79BFD98F5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {19F6B000-2C24-4431-8F39-DBF79BFD98F5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index 6229de6e..e0eda6f2 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -21,7 +21,7 @@ using Java.Lang; using Java.Util; using MASES.EntityFrameworkCore.KNet.Serialization.Json; -using MASES.EntityFrameworkCore.KNet.Serialization.Storage; +using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; using MASES.KNet.Common; using MASES.KNet.Consumer; using MASES.KNet.Producer; diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index a821c77d..2f32c74b 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -18,7 +18,7 @@ using MASES.EntityFrameworkCore.KNet.Serialization; using MASES.EntityFrameworkCore.KNet.Serialization.Json; -using MASES.EntityFrameworkCore.KNet.Serialization.Storage; +using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; using MASES.KNet.Common; using MASES.KNet.Consumer; using MASES.KNet.Producer; diff --git a/src/net/KEFCore/KEFCore.csproj b/src/net/KEFCore/KEFCore.csproj index 82642c62..4f69d723 100644 --- a/src/net/KEFCore/KEFCore.csproj +++ b/src/net/KEFCore/KEFCore.csproj @@ -8,7 +8,7 @@ EntityFrameworkCore KNet - EntityFrameworkCore provider for Apache Kafka MASES.EntityFrameworkCore.KNet ..\..\..\bin\ - Entity Framework Core;entity-framework-core;ef;efcore;orm;sql kafka apache-kafka dotnet clr netcore net5 net6 kafka connect streams producer consumer providers streamprovider confluent + Entity Framework Core;entity-framework-core;ef;efcore;orm;sql kafka apache-kafka dotnet clr netcore net6 kafka connect streams producer consumer providers streamprovider confluent MASES.EntityFrameworkCore.KNet gettingstarted.md enable diff --git a/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.InMemory.json b/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.InMemory.json new file mode 100644 index 00000000..b09c7f10 --- /dev/null +++ b/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.InMemory.json @@ -0,0 +1,4 @@ +{ + "UseInMemoryProvider": true, + "NumberOfExecutions": 10 +} diff --git a/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KNetReplicator.json b/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KNetReplicator.json new file mode 100644 index 00000000..4ae48342 --- /dev/null +++ b/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KNetReplicator.json @@ -0,0 +1,5 @@ +{ + "UseCompactedReplicator": true, + "BootstrapServers": "192.168.1.103:9092", + "NumberOfExecutions": 10 +} diff --git a/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KafkaStreams.json b/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KafkaStreams.json new file mode 100644 index 00000000..65cc5311 --- /dev/null +++ b/test/KEFCore.Benchmark.Avro.Test/Benchmark.Avro.Test.KafkaStreams.json @@ -0,0 +1,4 @@ +{ + "BootstrapServers": "192.168.1.103:9092", + "NumberOfExecutions": 10 +} diff --git a/test/KEFCore.Benchmark.Avro.Test/KEFCore.Benchmark.Avro.Test.csproj b/test/KEFCore.Benchmark.Avro.Test/KEFCore.Benchmark.Avro.Test.csproj new file mode 100644 index 00000000..e0e675aa --- /dev/null +++ b/test/KEFCore.Benchmark.Avro.Test/KEFCore.Benchmark.Avro.Test.csproj @@ -0,0 +1,33 @@ + + + + Exe + MASES.EntityFrameworkCore.KNet.Benchmark.Avro.Test + MASES.EntityFrameworkCore.KNet.Benchmark.Avro.Test + EntityFrameworkCore KNet Benchmark Avro Test + EntityFrameworkCore KNet Benchmark.Avro.Test + MASES.EntityFrameworkCore.KNet.Benchmark.Avro.Test + ..\..\bin\ + + + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + diff --git a/test/KEFCore.Benchmark.Avro.Test/Program.cs b/test/KEFCore.Benchmark.Avro.Test/Program.cs new file mode 100644 index 00000000..c071ea61 --- /dev/null +++ b/test/KEFCore.Benchmark.Avro.Test/Program.cs @@ -0,0 +1,272 @@ +/* + * MIT License + * + * Copyright (c) 2022 MASES s.r.l. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +using MASES.EntityFrameworkCore.KNet.Infrastructure; +using MASES.EntityFrameworkCore.KNet.Serialization.Avro; +using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; +using MASES.KNet.Streams; +using Microsoft.EntityFrameworkCore; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.Json; + +namespace MASES.EntityFrameworkCore.KNet.Test +{ + partial class Program + { + internal static ProgramConfig config = new(); + + static void ReportString(string message) + { + if (Debugger.IsAttached) + { + Trace.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}"); + } + else + { + Console.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}"); + } + } + + static void Main(string[] args) + { + //AvroSerialization.TestAvroSerialization(); + + BloggingContext context = null; + var testWatcher = new Stopwatch(); + var globalWatcher = new Stopwatch(); + + if (args.Length > 0) + { + config = JsonSerializer.Deserialize(File.ReadAllText(args[0])); + } + + KafkaDbContext.EnableKEFCoreTracing = config.EnableKEFCoreTracing; + + if (!config.UseInMemoryProvider) + { + KEFCore.CreateGlobalInstance(); + } + + var databaseName = config.UseModelBuilder ? config.DatabaseNameWithModel : config.DatabaseName; + + try + { + globalWatcher.Start(); + StreamsConfigBuilder streamConfig = null; + if (!config.UseInMemoryProvider) + { + streamConfig = StreamsConfigBuilder.Create(); + streamConfig = streamConfig.WithAcceptableRecoveryLag(100); + } + + using (context = new BloggingContext() + { + BootstrapServers = config.BootstrapServers, + ApplicationId = config.ApplicationId, + DbName = databaseName, + StreamsConfig = streamConfig, + ValueContainerType = typeof(AvroValueContainer<>), + ValueSerializationType = typeof(KEFCoreSerDesAvro<>), + }) + { + + if (config.DeleteApplicationData) + { + context.Database.EnsureDeleted(); + if (context.Database.EnsureCreated()) + { + ReportString("EnsureCreated created database"); + } + else + { + ReportString("EnsureCreated does not created database"); + } + } + + testWatcher.Start(); + Stopwatch watch = new Stopwatch(); + if (config.LoadApplicationData) + { + watch.Start(); + for (int i = 0; i < config.NumberOfElements; i++) + { + context.Add(new Blog + { + Url = "http://blogs.msdn.com/adonet" + i.ToString(), + Posts = new List() + { + new Post() + { + Title = "title", + Content = i.ToString() + } + }, + Rating = i, + }); + } + watch.Stop(); + ReportString($"Elapsed data load: {watch.Elapsed}"); + watch.Restart(); + context.SaveChanges(); + watch.Stop(); + ReportString($"Elapsed SaveChanges: {watch.Elapsed}"); + } + } + + for (int execution = 0; execution < config.NumberOfExecutions; execution++) + { + ReportString($"Starting cycle number {execution}"); + using (context = new BloggingContext() + { + BootstrapServers = config.BootstrapServers, + ApplicationId = config.ApplicationId, + DbName = databaseName, + StreamsConfig = streamConfig, + }) + { + Stopwatch watch = new Stopwatch(); + watch.Restart(); + var post = context.Posts.Single(b => b.BlogId == 2); + watch.Stop(); + ReportString($"First execution of context.Posts.Single(b => b.BlogId == 2) takes {watch.Elapsed}. Result is {post}"); + + watch.Restart(); + post = context.Posts.Single(b => b.BlogId == 2); + watch.Stop(); + ReportString($"Second execution of context.Posts.Single(b => b.BlogId == 2) takes {watch.Elapsed}. Result is {post}"); + + watch.Restart(); + post = context.Posts.Single(b => b.BlogId == config.NumberOfElements - 1); + watch.Stop(); + ReportString($"Execution of context.Posts.Single(b => b.BlogId == {config.NumberOfElements - 1}) takes {watch.Elapsed}. Result is {post}"); + + watch.Restart(); + var all = context.Posts.All((o) => true); + watch.Stop(); + ReportString($"Execution of context.Posts.All((o) => true) takes {watch.Elapsed}. Result is {all}"); + + Blog blog = null; + watch.Restart(); + blog = context.Blogs!.Single(b => b.BlogId == 1); + watch.Stop(); + ReportString($"First execution of context.Blogs!.Single(b => b.BlogId == 1) takes {watch.Elapsed}. Result is {blog}"); + watch.Restart(); + blog = context.Blogs!.Single(b => b.BlogId == 1); + watch.Stop(); + ReportString($"Second execution of context.Blogs!.Single(b => b.BlogId == 1) takes {watch.Elapsed}. Result is {blog}"); + + watch.Restart(); + var selector = (from op in context.Blogs + join pg in context.Posts on op.BlogId equals pg.BlogId + where pg.BlogId == op.BlogId + select new { pg, op }); + watch.Stop(); + var result = selector.ToList(); + ReportString($"Execution of first complex query takes {watch.Elapsed}. Result is {result.Count} element{(result.Count == 1 ? string.Empty : "s")}"); + + watch.Restart(); + var selector2 = (from op in context.Blogs + join pg in context.Posts on op.BlogId equals pg.BlogId + where op.Rating >= 100 + select new { pg, op }); + watch.Stop(); + var result2 = selector.ToList(); + ReportString($"Execution of second complex query takes {watch.Elapsed}. Result is {result2.Count} element{(result2.Count == 1 ? string.Empty : "s")}"); + } + } + } + catch (Exception ex) + { + ReportString(ex.ToString()); + } + finally + { + context?.Dispose(); + testWatcher.Stop(); + globalWatcher.Stop(); + ReportString($"Full test completed in {globalWatcher.Elapsed}, only tests completed in {testWatcher.Elapsed}"); + } + } + } + + public class BloggingContext : KafkaDbContext + { + public override bool UsePersistentStorage { get; set; } = Program.config.UsePersistentStorage; + public override bool UseCompactedReplicator { get; set; } = Program.config.UseCompactedReplicator; + + public DbSet Blogs { get; set; } + public DbSet Posts { get; set; } + + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + if (Program.config.UseInMemoryProvider) + { + optionsBuilder.UseInMemoryDatabase(Program.config.DatabaseName); + } + else + { + base.OnConfiguring(optionsBuilder); + } + } + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + if (!Program.config.UseModelBuilder) return; + + modelBuilder.Entity().HasKey(c => new { c.BlogId, c.Rating }); + } + } + + public class Blog + { + public int BlogId { get; set; } + public string Url { get; set; } + public int Rating { get; set; } + public List Posts { get; set; } + + public override string ToString() + { + return $"BlogId: {BlogId} Url: {Url} Rating: {Rating}"; + } + } + + public class Post + { + public int PostId { get; set; } + public string Title { get; set; } + public string Content { get; set; } + + public int BlogId { get; set; } + public Blog Blog { get; set; } + + public override string ToString() + { + return $"PostId: {PostId} Title: {Title} Content: {Content} BlogId: {BlogId}"; + } + } +} diff --git a/test/KEFCore.Test.sln b/test/KEFCore.Test.sln index 624d32dd..7a596621 100644 --- a/test/KEFCore.Test.sln +++ b/test/KEFCore.Test.sln @@ -17,6 +17,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.Benchmark.Test", "K EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.SerDes", "..\src\net\KEFCore.SerDes\KEFCore.SerDes.csproj", "{C8FAF4B6-796E-40C5-990E-A1308306C1F2}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.SerDes.Avro", "..\src\net\KEFCore.SerDes.Avro\KEFCore.SerDes.Avro.csproj", "{3F5C48E6-02E8-4921-AA1D-24F8F644F67E}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.Benchmark.Avro.Test", "KEFCore.Benchmark.Avro.Test\KEFCore.Benchmark.Avro.Test.csproj", "{40D1001C-EC50-4994-BE40-F410DB20AF4F}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.SerDes.Avro.Compiler", "..\src\net\KEFCore.SerDes.Avro\KEFCore.SerDes.Avro.Compiler.csproj", "{687F7AED-70E0-433C-AF89-89F2878EE106}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -43,6 +49,18 @@ Global {C8FAF4B6-796E-40C5-990E-A1308306C1F2}.Debug|Any CPU.Build.0 = Debug|Any CPU {C8FAF4B6-796E-40C5-990E-A1308306C1F2}.Release|Any CPU.ActiveCfg = Release|Any CPU {C8FAF4B6-796E-40C5-990E-A1308306C1F2}.Release|Any CPU.Build.0 = Release|Any CPU + {3F5C48E6-02E8-4921-AA1D-24F8F644F67E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3F5C48E6-02E8-4921-AA1D-24F8F644F67E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3F5C48E6-02E8-4921-AA1D-24F8F644F67E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3F5C48E6-02E8-4921-AA1D-24F8F644F67E}.Release|Any CPU.Build.0 = Release|Any CPU + {40D1001C-EC50-4994-BE40-F410DB20AF4F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {40D1001C-EC50-4994-BE40-F410DB20AF4F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {40D1001C-EC50-4994-BE40-F410DB20AF4F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {40D1001C-EC50-4994-BE40-F410DB20AF4F}.Release|Any CPU.Build.0 = Release|Any CPU + {687F7AED-70E0-433C-AF89-89F2878EE106}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {687F7AED-70E0-433C-AF89-89F2878EE106}.Debug|Any CPU.Build.0 = Debug|Any CPU + {687F7AED-70E0-433C-AF89-89F2878EE106}.Release|Any CPU.ActiveCfg = Release|Any CPU + {687F7AED-70E0-433C-AF89-89F2878EE106}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -53,6 +71,9 @@ Global {CC3396D4-7365-41C1-B82E-200FE87A4F33} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA} {D8B28630-F80E-42F2-AD9C-5955D5AC2E69} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA} {C8FAF4B6-796E-40C5-990E-A1308306C1F2} = {B35B16BB-890F-4385-AB20-7AA4DD6E9C01} + {3F5C48E6-02E8-4921-AA1D-24F8F644F67E} = {B35B16BB-890F-4385-AB20-7AA4DD6E9C01} + {40D1001C-EC50-4994-BE40-F410DB20AF4F} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA} + {687F7AED-70E0-433C-AF89-89F2878EE106} = {B35B16BB-890F-4385-AB20-7AA4DD6E9C01} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {36C294ED-9ECE-42AA-8273-31E008749AF3}