From f66b95013c06172418e5ffd2dc235ca34cfd4070 Mon Sep 17 00:00:00 2001 From: jasper-d Date: Fri, 29 Sep 2023 15:31:15 +0200 Subject: [PATCH] Serialization tests --- .../NatsConnectionTest.cs | 204 ++++++++++++++++++ 1 file changed, 204 insertions(+) diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs index 98c1407d7..c052a830f 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs @@ -1,4 +1,7 @@ +using System.Buffers; using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Channels; using Xunit.Sdk; @@ -323,6 +326,207 @@ await Retry.Until( list.ShouldEqual(100, 200, 300, 400, 500); } + + public static IEnumerable TestConfig() + { + Type[] types = { typeof(byte[]), typeof(ReadOnlyMemory), typeof(Memory) }; + byte[] payload = "hello world"u8.ToArray(); + bool[] waitUntilSend = { true, false }; + INatsSerializer[] natsSerializer = { NatsOpts.Default.Serializer, new ByteSerializer() }; + + foreach (var pubType in types) + { + foreach (var subType in types) + { + foreach (var wait in waitUntilSend) + { + foreach (var serializer in natsSerializer) + { + yield return new object[] { pubType, subType, wait, serializer, payload }; + } + } + } + } + } + + [Theory] + [MemberData(nameof(TestConfig))] + public async Task TestBinaryPayload(Type pubType, Type subType, bool waitUntilSend, INatsSerializer serializer, byte[] payload) + { + byte[]? received = Array.Empty(); + + if (pubType == typeof(byte[])) + { + if (subType == typeof(byte[])) + { + received = await TestBinaryPayloadCore(payload, waitUntilSend, serializer); + } + else if (subType == typeof(ReadOnlyMemory)) + { + var rec = await TestBinaryPayloadCore>(payload, waitUntilSend, serializer); + received = rec.ToArray(); + } + else if (subType == typeof(Memory)) + { + var rec = await TestBinaryPayloadCore>(payload, waitUntilSend, serializer); + received = rec.ToArray(); + } + } + else if (pubType == typeof(Memory)) + { + if (subType == typeof(byte[])) + { + received = await TestBinaryPayloadCore, byte[]>(payload, waitUntilSend, serializer); + } + else if (subType == typeof(ReadOnlyMemory)) + { + var rec = await TestBinaryPayloadCore, ReadOnlyMemory>(payload, waitUntilSend, serializer); + received = rec.ToArray(); + } + else if (subType == typeof(Memory)) + { + var rec = await TestBinaryPayloadCore, Memory>(payload, waitUntilSend, serializer); + received = rec.ToArray(); + } + } + else if (pubType == typeof(ReadOnlyMemory)) + { + if (subType == typeof(byte[])) + { + received = await TestBinaryPayloadCore, byte[]>(payload, waitUntilSend, serializer); + } + else if (subType == typeof(ReadOnlyMemory)) + { + var rec = await TestBinaryPayloadCore, ReadOnlyMemory>(payload, waitUntilSend, serializer); + received = rec.ToArray(); + } + else if (subType == typeof(Memory)) + { + var rec = await TestBinaryPayloadCore, Memory>(payload, waitUntilSend, serializer); + received = rec.ToArray(); + } + } + + Assert.True(payload.SequenceEqual(received ?? throw new NullReferenceException())); + } + + public async Task TestBinaryPayloadCore(TPub payload, bool waitUntilSend, INatsSerializer serializer) + { + await using var server = NatsServer.Start(_output, _transportType); + + await using var subConnection = server.CreateClientConnection(); + await using var pubConnection = server.CreateClientConnection(); + + using var timeout = new CancellationTokenSource(5_000); + var subject = Guid.NewGuid().ToString("N"); + + var receivedData = new List(); + var pubOpts = new NatsPubOpts {WaitUntilSent = waitUntilSend, Serializer = serializer }; + var subOpts = new NatsSubOpts { Serializer = serializer }; + await using var sub = await subConnection.SubscribeAsync(subject, opts: subOpts, cancellationToken: timeout.Token); + + var receivedTask = Task.Run( + async () => + { + await sub.Msgs.WaitToReadAsync(timeout.Token); + sub.Msgs.TryRead(out var msg); + receivedData.Add(msg.Data); + }, + timeout.Token); + + + await pubConnection.PublishAsync(subject, payload, opts: pubOpts, cancellationToken: timeout.Token); + + await receivedTask; + + var receivedPayload = Assert.Single(receivedData); + return receivedPayload; + } + + private sealed class ByteSerializer : INatsSerializer + { + public int Serialize(ICountableBufferWriter bufferWriter, T? value) + { + if (value == null) + { + return 0; + } + + if (typeof(T) == typeof(ReadOnlyMemory)) + { + var rom = (ReadOnlyMemory)(object)value; + var buffer = bufferWriter.GetMemory(rom.Length); + rom.CopyTo(buffer); + bufferWriter.Advance(rom.Length); + return rom.Length; + } + + if (typeof(T) == typeof(Memory)) + { + var mem = (Memory)(object)value; + var buffer = bufferWriter.GetMemory(mem.Length); + mem.CopyTo(buffer); + bufferWriter.Advance(mem.Length); + return mem.Length; + } + + if (typeof(T) == typeof(byte[])) + { + var arr = (byte[])(object)value; + var buffer = bufferWriter.GetMemory(arr.Length); + arr.CopyTo(buffer); + bufferWriter.Advance(arr.Length); + return arr.Length; + } + + throw new NotSupportedException(); + } + + public T? Deserialize(in ReadOnlySequence buffer) + { + var arr = new byte[buffer.Length]; + buffer.CopyTo(arr.AsSpan()); + + if (typeof(T) == typeof(ReadOnlyMemory)) + { + return (T)(object)new ReadOnlyMemory(arr); + } + + if (typeof(T) == typeof(Memory)) + { + return (T)(object)new Memory(arr); + } + + if (typeof(T) == typeof(byte[])) + { + return (T)(object)arr; + } + + throw new NotSupportedException(); + } + + public object? Deserialize(in ReadOnlySequence buffer, Type type) + { + if (type == typeof(ReadOnlyMemory)) + { + return Deserialize>(buffer); + } + if (type == typeof(Memory)) + { + return Deserialize>(buffer); + } + + if (type == typeof(byte[])) + { + return Deserialize(buffer); + } + + throw new NotSupportedException(); + } + } + + + } public class SampleClass : IEquatable