Skip to content

Commit

Permalink
Added Json encoder for Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers committed Oct 14, 2023
1 parent a5d3ffb commit 335d19f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
</PropertyGroup>
<ItemGroup>
<Compile Remove="AvroValueContainer.cs" />
<Compile Remove="KEFCoreSerDes.Avro.cs" />
<Compile Remove="KEFCoreSerDes.AvroBinary.cs" />
<Compile Remove="KEFCoreSerDes.AvroJson.cs" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro;

/// <summary>
/// Avro extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// Avro Binary encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class KEFCoreSerDesAvro<T> : KNetSerDes<T>
public class KEFCoreSerDesAvroBinary<T> : KNetSerDes<T>
{
static SpecificDefaultWriter SpecificWriter = new SpecificDefaultWriter(AvroValueContainer._SCHEMA);
static SpecificDefaultReader SpecificReader = new SpecificDefaultReader(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);
static readonly SpecificDefaultWriter SpecificWriter = new SpecificDefaultWriter(AvroValueContainer._SCHEMA);
static readonly SpecificDefaultReader SpecificReader = new SpecificDefaultReader(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
Expand Down
66 changes: 66 additions & 0 deletions src/net/KEFCore.SerDes.Avro/KEFCoreSerDes.AvroJson.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

#nullable enable

using Avro.IO;
using Avro.Specific;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Header;

namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro;

/// <summary>
/// Avro Json encoder extension of <see cref="KNetSerDes{T}"/>, for example <see href="https://masesgroup.github.io/KNet/articles/usageSerDes.html"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public class KEFCoreSerDesAvroJson<T> : KNetSerDes<T>
{
static readonly SpecificDefaultWriter SpecificWriter = new SpecificDefaultWriter(AvroValueContainer._SCHEMA);
static readonly SpecificDefaultReader SpecificReader = new SpecificDefaultReader(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA);

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
using MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
SpecificWriter.Write(data, encoder);
encoder.Flush();
return memStream.ToArray();
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null!, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
using MemoryStream memStream = new(data);
JsonDecoder decoder = new(AvroValueContainer._SCHEMA, memStream);
T t = (T)Activator.CreateInstance(typeof(T))!;
t = SpecificReader.Read(t!, decoder);
return t;
}
}
1 change: 1 addition & 0 deletions test/Common/ProgramConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace MASES.EntityFrameworkCore.KNet.Test
{
public class ProgramConfig
{
public bool UseAvroBinary { get; set; } = false;
public bool EnableKEFCoreTracing { get; set; } = false;
public bool UseInMemoryProvider { get; set; } = false;
public bool UseModelBuilder { get; set; } = false;
Expand Down
4 changes: 1 addition & 3 deletions test/KEFCore.Benchmark.Avro.Test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ static void ReportString(string message)

static void Main(string[] args)
{
//AvroSerialization.TestAvroSerialization();

BloggingContext context = null;
var testWatcher = new Stopwatch();
var globalWatcher = new Stopwatch();
Expand Down Expand Up @@ -91,7 +89,7 @@ static void Main(string[] args)
DbName = databaseName,
StreamsConfig = streamConfig,
ValueContainerType = typeof(AvroValueContainer<>),
ValueSerializationType = typeof(KEFCoreSerDesAvro<>),
ValueSerializationType = config.UseAvroBinary ? typeof(KEFCoreSerDesAvroBinary<>) : typeof(KEFCoreSerDesAvroJson<>),
})
{

Expand Down

0 comments on commit 335d19f

Please sign in to comment.