Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support all binary payload APIs with only generics #143

Merged
merged 9 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Runner(INatsConnection connection)
[RootCommand]
public async Task Run()
{
var subscription = await _connection.SubscribeAsync("foo");
var subscription = await _connection.SubscribeAsync<string>("foo");

_ = Task.Run(async () =>
{
Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync(subject);
var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data.Span)}\n");
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n");
if (msg.Headers != null)
{
foreach (var (key, values) in msg.Headers)
Expand Down
11 changes: 4 additions & 7 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// > nats pub foo.xyz --count=10 "my_message_{{ Count }}"
using System.Text;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

Expand All @@ -12,13 +11,12 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers");
var sub1 = await connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
Print($"[1][RCV] {msg.Subject}: {data}\n");
Print($"[1][RCV] {msg.Subject}: {msg.Data}\n");
}
});

Expand All @@ -28,13 +26,12 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers");
var sub2 = await connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
Print($"[2][RCV] {msg.Subject}: {data}\n");
Print($"[2][RCV] {msg.Subject}: {msg.Data}\n");
}
});

Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeRaw/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync(subject);
var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
var data = Encoding.UTF8.GetString(msg.Data!);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the null-forgiving operator needed here? Is SubscribeAsync<T> eventually returning T? instead? Would be nice if we could avoid that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default values in general yield empty/sentinel payloads that's why there are T?s in a lot of places. Do you think we shouldn't allow nulls? How would we handle sending and receiving empty payloads?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I guess there is no way to avoid it for reference types when default may be retruned. Seeing as System.Text.JSON even does it

Print($"[RCV] {msg.Subject}: {data}\n");
}

Expand Down
10 changes: 5 additions & 5 deletions sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace Example.JetStream.PullConsumer;

public class RawDataSerializer : INatsSerializer
{
public INatsSerializer? Next => default;

public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
if (value is RawData data)
Expand All @@ -16,13 +18,11 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
throw new Exception($"Can only work with '{typeof(RawData)}'");
}

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => (T?)Deserialize(buffer, typeof(T));

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type)
public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
{
if (type != typeof(RawData))
if (typeof(T) != typeof(RawData))
throw new Exception($"Can only work with '{typeof(RawData)}'");

return new RawData(buffer.ToArray());
return (T)(object)new RawData(buffer.ToArray());
}
}
169 changes: 169 additions & 0 deletions sandbox/MicroBenchmark/DefaultBench.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#pragma warning disable IDE0044
using System.Text.Json;
using BenchmarkDotNet.Attributes;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using StackExchange.Redis;
using ZLogger;

namespace MicroBenchmark;

public struct MyVector3
{
public float X { get; set; }

public float Y { get; set; }

public float Z { get; set; }
}

// var run = new DefaultRun();
// await run.SetupAsync();
// await run.RunBenchmark();
// await run.RunStackExchangeRedis();

// await run.CleanupAsync();
#pragma warning disable CS8618

[MemoryDiagnoser]
[ShortRunJob]
[PlainExporter]
public class DefaultBench
{
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
private NatsConnection _connection;
private string _subject;
private ConnectionMultiplexer _redis;
private object _gate;
private Handler _handler;
private IDisposable _subscription = default!;

[GlobalSetup]
public async Task SetupAsync()
{
var provider = new ServiceCollection()
.AddLogging(x =>
{
x.ClearProviders();
x.SetMinimumLevel(LogLevel.Information);
x.AddZLoggerConsole();
})
.BuildServiceProvider();

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<DefaultBench>>();
var options = NatsOpts.Default with
{
LoggerFactory = loggerFactory,
Echo = true,
Verbose = false,
};

_connection = new NATS.Client.Core.NatsConnection(options);
_subject = "foobar";
await _connection.ConnectAsync();
_gate = new object();
_redis = StackExchange.Redis.ConnectionMultiplexer.Connect("localhost");

_handler = new Handler();

// subscription = connection.Subscribe<MyVector3>(key, handler.Handle);
}

// [Benchmark]
public async Task Nop()
{
await Task.Yield();
}

[Benchmark]
public async Task PublishAsync()
{
for (var i = 0; i < 1; i++)
{
await _connection.PublishAsync(_subject, default(MyVector3));
}
}

// [Benchmark]
public async Task PublishAsyncRedis()
{
for (var i = 0; i < 1; i++)
{
await _redis.GetDatabase().PublishAsync(_subject, JsonSerializer.Serialize(default(MyVector3)));
}
}

// [Benchmark]
public void RunBenchmark()
{
const int count = 10000;
_handler.Gate = _gate;
_handler.Called = 0;
_handler.Max = count;

for (var i = 0; i < count; i++)
{
_connection.PublishAsync(_subject, default(MyVector3));
}

lock (_gate)
{
// Monitor.Wait(gate);
Thread.Sleep(1000);
}
}

// [Benchmark]
// public async Task RunStackExchangeRedis()
// {
// var tcs = new TaskCompletionSource();
// var called = 0;
// redis.GetSubscriber().Subscribe(key.Key, (channel, v) =>
// {
// if (Interlocked.Increment(ref called) == 1000)
// {
// tcs.TrySetResult();
// }
// });

// for (int i = 0; i < 1000; i++)
// {
// _ = redis.GetDatabase().PublishAsync(key.Key, JsonSerializer.Serialize(new MyVector3()), StackExchange.Redis.CommandFlags.FireAndForget);
// }

// await tcs.Task;
// }
[GlobalCleanup]
public async Task CleanupAsync()
{
_subscription?.Dispose();
if (_connection != null)
{
await _connection.DisposeAsync();
}

_redis?.Dispose();
}

private class Handler
{
#pragma warning disable SA1401
public int Called;
public int Max;
public object Gate;
#pragma warning restore SA1401

public void Handle(MyVector3 vec)
{
if (Interlocked.Increment(ref Called) == Max)
{
lock (Gate)
{
Monitor.PulseAll(Gate);
}
}
}
}
}
2 changes: 1 addition & 1 deletion sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.1" />
<PackageReference Include="BenchmarkDotNet" Version="0.13.8" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.5.43" />
<PackageReference Include="ZLogger" Version="1.6.1" />
Expand Down
Loading