Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Avro type and serializer for complex Primary Key #100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 89 additions & 59 deletions src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,69 +175,98 @@ The engine comes with two different encoders:

The following schema is the default used from the engine and can be registered in Apache Schema registry so other applications can use it to extract the data stored in the topics:

```json
{
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "AvroValueContainer",
"doc": "Represents the storage container type to be used from KEFCore",
"fields": [
{
"name": "EntityName",
"type": "string"
},
{
"name": "ClrType",
"type": "string"
},
{
"name": "Data",
"type": {
"type": "array",
"items": {
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "PropertyDataRecord",
"doc": "Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore",
"fields": [
{
"name": "PropertyIndex",
"type": "int"
},
{
"name": "PropertyName",
"type": "string"
},
{
"name": "ClrType",
"type": "string"
},
{
"name": "Value",
"type": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"string"
]
}
]
}
}
}
]
}
```
- Complex Primary Key schema:
```json
{
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "AvroKeyContainer",
"doc": "Represents the storage container type to be used from KEFCore for keys",
"fields": [
{
"name": "PrimaryKey",
"type": {
"type": "array",
"items": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"string"
]
}
}
]
}
```


- ValueContainer schema:
```json
{
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "AvroValueContainer",
"doc": "Represents the storage container type to be used from KEFCore",
"fields": [
{
"name": "EntityName",
"type": "string"
},
{
"name": "ClrType",
"type": "string"
},
{
"name": "Data",
"type": {
"type": "array",
"items": {
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "PropertyDataRecord",
"doc": "Represents the single container for Entity properties stored in AvroValueContainer and used from KEFCore",
"fields": [
{
"name": "PropertyIndex",
"type": "int"
},
{
"name": "PropertyName",
"type": "string"
},
{
"name": "ClrType",
"type": "string"
},
{
"name": "Value",
"type": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"string"
]
}
]
}
}
}
]
}
```
The extension converted this schema into code to speedup the exection of serialization/deserialization operations.

### How to use Avro

`KafkaDbContext` contains three properties can be used to override the default types:
- **KeySerializationType**: Leave this value untouched, till now the engine uses the default serializer
- **ValueSerializationType**: set this value to `KEFCoreSerDesAvroBinary<>` or `KEFCoreSerDesAvroJson<>`
- **KeySerializationType**: set this value to `KEFCoreSerDesKeyAvroBinary<>` or `KEFCoreSerDesKeyAvroJson<>`, the type automatically fallback to default serializer for simple Primary Key
- **ValueSerializationType**: set this value to `KEFCoreSerDesValueContainerAvroBinary<>` or `KEFCoreSerDesValueContainerAvroJson<>`
- **ValueContainerType**: set this value to `AvroValueContainer<>`

An example is:
Expand All @@ -248,8 +277,9 @@ using (context = new BloggingContext()
BootstrapServers = "KAFKA-SERVER:9092",
ApplicationId = "MyAppid",
DbName = "MyDBName",
KeySerializationType = UseAvroBinary ? typeof(KEFCoreSerDesKeyAvroBinary<>) : typeof(KEFCoreSerDesKeyAvroJson<>),
ValueContainerType = typeof(AvroValueContainer<>),
ValueSerializationType = UseAvroBinary ? typeof(KEFCoreSerDesAvroBinary<>) : typeof(KEFCoreSerDesAvroJson<>),
ValueSerializationType = UseAvroBinary ? typeof(KEFCoreSerDesValueContainerAvroBinary<>) : typeof(KEFCoreSerDesValueContainerAvroJson<>),
})
{
// execute stuff here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static void BuildSchemaClassesFromFiles( string outputFolder, params stri
public static void BuildDefaultSchema(string outputFolder)
{
BuildSchemaClassesFromFiles(outputFolder, "AvroValueContainer.avsc");
BuildSchemaClassesFromFiles(outputFolder, "AvroKeyContainer.avsc");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
<Nullable>enable</Nullable>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
</PropertyGroup>
<ItemGroup>
<None Include="..\KEFCore.SerDes.Avro\AvroKeyContainer.avsc" Link="AvroKeyContainer.avsc">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\KEFCore.SerDes.Avro\AvroValueContainer.avsc" Link="AvroValueContainer.avsc">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.11.3" />
Expand Down
23 changes: 23 additions & 0 deletions src/net/KEFCore.SerDes.Avro/AvroKeyContainer.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "AvroKeyContainer",
"doc": "Represents the storage container type to be used from KEFCore for keys",
"fields": [
{
"name": "PrimaryKey",
"type": {
"type": "array",
"items": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"string"
]
}
}
]
}
2 changes: 1 addition & 1 deletion src/net/KEFCore.SerDes.Avro/AvroValueContainer.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"namespace": "MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage",
"type": "record",
"name": "AvroValueContainer",
"doc": "Represents the storage container type to be used from KEFCore",
"doc": "Represents the storage container type to be used from KEFCore values",
"fields": [
{
"name": "EntityName",
Expand Down
59 changes: 59 additions & 0 deletions src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler, version 1.11.3
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;

/// <summary>
/// Represents the storage container type to be used from KEFCore for keys
/// </summary>
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("MASES.EntityFrameworkCore.KNet.Serialization.Avro.Compiler", "1.11.3")]
public partial class AvroKeyContainer : global::Avro.Specific.ISpecificRecord
{
public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse(@"{""type"":""record"",""name"":""AvroKeyContainer"",""doc"":""Represents the storage container type to be used from KEFCore for keys"",""namespace"":""MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage"",""fields"":[{""name"":""PrimaryKey"",""type"":{""type"":""array"",""items"":[""null"",""boolean"",""int"",""long"",""float"",""double"",""string""]}}]}");

Check warning on line 22 in src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Missing XML comment for publicly visible type or member 'AvroKeyContainer._SCHEMA'
private IList<System.Object> _PrimaryKey;
public virtual global::Avro.Schema Schema

Check warning on line 24 in src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Missing XML comment for publicly visible type or member 'AvroKeyContainer.Schema'
{
get
{
return AvroKeyContainer._SCHEMA;
}
}
public IList<System.Object> PrimaryKey

Check warning on line 31 in src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Missing XML comment for publicly visible type or member 'AvroKeyContainer.PrimaryKey'
{
get
{
return this._PrimaryKey;
}
set
{
this._PrimaryKey = value;
}
}
public virtual object Get(int fieldPos)

Check warning on line 42 in src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Missing XML comment for publicly visible type or member 'AvroKeyContainer.Get(int)'
{
switch (fieldPos)
{
case 0: return this.PrimaryKey;
default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)

Check warning on line 50 in src/net/KEFCore.SerDes.Avro/Generated/AvroKeyContainer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Missing XML comment for publicly visible type or member 'AvroKeyContainer.Put(int, object)'
{
switch (fieldPos)
{
case 0: this.PrimaryKey = (IList<System.Object>)fieldValue; break;
default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
3 changes: 3 additions & 0 deletions src/net/KEFCore.SerDes.Avro/KEFCore.SerDes.Avro.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
</ItemGroup>

<ItemGroup>
<None Update="AvroKeyContainer.avsc">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="AvroValueContainer.avsc">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

#nullable enable

using Avro.IO;
using Avro.Specific;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Header;

namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro;

/// <summary>
/// Avro Key Binary encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class KEFCoreSerDesKeyAvroBinary<T> : KNetSerDes<T>
{
static readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA);
static readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA);
readonly IKNetSerDes<T> _defaultSerDes = default;

Check warning on line 37 in src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.KeyContainer.AvroBinary.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Cannot convert null literal to non-nullable reference type.
/// <summary>
/// Default initializer
/// </summary>
public KEFCoreSerDesKeyAvroBinary()
{
if (KNetSerialization.IsInternalManaged<T>())
{
_defaultSerDes = new KNetSerDes<T>();
}
else if (!typeof(T).IsArray)
{
throw new InvalidOperationException($"KEFCoreSerDesKeyAvroBinary cannot manage {typeof(T).Name}, override or build a new serializaer");
}
}

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

using MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
SpecificWriter.Write(data, encoder);
return memStream.ToArray();
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data);

using MemoryStream memStream = new(data);
BinaryDecoder decoder = new(memStream);
T t = (T)Activator.CreateInstance(typeof(T))!;
t = SpecificReader.Read(t!, decoder);
return t;
}
}
Loading
Loading