Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROPOSAL] NATS Client POC #530

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ExampleApp/ExampleApp.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\src\NATS.Client\NATS.Client.csproj" />
<ProjectReference Include="..\src\NATS.Client.JetStream\NATS.Client.JetStream.csproj" />
</ItemGroup>

</Project>
100 changes: 100 additions & 0 deletions ExampleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// ReSharper disable AccessToDisposedClosure

using System.Text;
using NATS.Client;
using NATS.Client.JetStream;

await using var client = new NatsClient();

CancellationTokenSource cts = new();

// Subscribe for int, string, bytes, json
List<Task> tasks =
[
Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<int>("x.int", cancellationToken: cts.Token))
{
Console.WriteLine($"Received int: {msg.Data}");
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<string>("x.string", cancellationToken: cts.Token))
{
Console.WriteLine($"Received string: {msg.Data}");
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<byte[]>("x.bytes", cancellationToken: cts.Token))
{
if (msg.Data != null)
{
Console.WriteLine($"Received bytes: {Encoding.UTF8.GetString(msg.Data)}");
}
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<MyData>("x.json", cancellationToken: cts.Token))
{
Console.WriteLine($"Received data: {msg.Data}");
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<MyData>("x.service", cancellationToken: cts.Token))
{
if (msg.Data != null)
{
Console.WriteLine($"Replying to data: {msg.Data}");
await msg.ReplyAsync($"Thank you {msg.Data.Name} your Id is {msg.Data.Id}!");
}
}
}),

Task.Run(async () =>
{
var id = 0;
await foreach (var msg in client.SubscribeAsync<object>("x.service2", cancellationToken: cts.Token))
{
await msg.ReplyAsync(new MyData(id++, $"foo{id}"));
}
})
];

await Task.Delay(1000);

await client.PublishAsync("x.int", 100);
await client.PublishAsync("x.string", "Hello, World!");
await client.PublishAsync("x.bytes", new byte[] { 65, 66, 67 });
await client.PublishAsync("x.json", new MyData(30, "bar"));

// Request/Reply
{
var response = await client.RequestAsync<MyData, string>("x.service", new MyData(100, "foo"));
Console.WriteLine($"Response: {response.Data}");
}

// Request/Reply without request data
for (var i = 0; i < 3; i++)
{
var response = await client.RequestAsync<MyData>("x.service2");
Console.WriteLine($"Response[{i}]: {response.Data}");
}

// Use JetStream by referencing NATS.Client.JetStream pacakge
var js = client.GetJetStream();

await cts.CancelAsync();

await Task.WhenAll(tasks);

Console.WriteLine("Bye!");

public record MyData(int Id, string Name);
13 changes: 13 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Test
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Platform.Windows.Tests", "tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj", "{A37994CC-A23A-415E-8B61-9468C7178A55}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client", "src\NATS.Client\NATS.Client.csproj", "{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExampleApp", "ExampleApp\ExampleApp.csproj", "{D2B45676-57FB-4088-8A2D-4B188BF1319C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -291,6 +295,14 @@ Global
{A37994CC-A23A-415E-8B61-9468C7178A55}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.Build.0 = Release|Any CPU
{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Release|Any CPU.Build.0 = Release|Any CPU
{D2B45676-57FB-4088-8A2D-4B188BF1319C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D2B45676-57FB-4088-8A2D-4B188BF1319C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2B45676-57FB-4088-8A2D-4B188BF1319C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2B45676-57FB-4088-8A2D-4B188BF1319C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -341,6 +353,7 @@ Global
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{A37994CC-A23A-415E-8B61-9468C7178A55} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
2 changes: 2 additions & 0 deletions sandbox/Example.NativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public void Serialize(IBufferWriter<byte> bufferWriter, T value)

throw new NatsException($"Can't deserialize {typeof(T)}");
}

public INatsSerializer<T> CombineWith(INatsSerializer<T> next) => throw new NotImplementedException();
}

public record MyData
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace NATS.Client.Core.Commands;
/// These methods are in the hot path, and have all been
/// optimized to eliminate allocations and minimize copying
/// </remarks>
internal sealed class CommandWriter : IAsyncDisposable
public sealed class CommandWriter : IAsyncDisposable
{
// memory segment used to consolidate multiple small memory chunks
// 8520 should fit into 6 packets on 1500 MTU TLS connection or 1 packet on 9000 MTU TLS connection
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/PingCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace NATS.Client.Core.Commands;

internal class PingCommand : IValueTaskSource<TimeSpan>, IObjectPoolNode<PingCommand>
public class PingCommand : IValueTaskSource<TimeSpan>, IObjectPoolNode<PingCommand>
{
private readonly ObjectPool? _pool;
private DateTimeOffset _start;
Expand Down
183 changes: 105 additions & 78 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,12 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

public interface INatsConnection : IAsyncDisposable
public interface INatsClient : IAsyncDisposable
{
event AsyncEventHandler<NatsEventArgs>? ConnectionDisconnected;

event AsyncEventHandler<NatsEventArgs>? ConnectionOpened;

event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

INatsServerInfo? ServerInfo { get; }

NatsOpts Opts { get; }

NatsConnectionState ConnectionState { get; }

/// <summary>
/// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT).
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous round trip operation.</returns>
ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = default);
INatsConnection Connection { get; }

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -39,32 +22,6 @@ public interface INatsConnection : IAsyncDisposable
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes an empty message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action
/// or indicate an event for example and of messages.
/// </remarks>
ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg{T}"/> representing message details.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
Expand All @@ -82,37 +39,6 @@ public interface INatsConnection : IAsyncDisposable
/// </remarks>
IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group
/// and returns a <see cref="INatsSub{T}"/> object which provides more control over the subscription.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>An asynchronous task that completes with the NATS subscription.</returns>
/// <remarks>
/// <para>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </para>
/// <para>
/// This method returns a <see cref="INatsSub{T}"/> object which provides slightly lower level
/// control over the subscription. You can use this object to create your own core messaging
/// patterns or to create your own higher level abstractions.
/// </para>
/// </remarks>
ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
/// </summary>
/// <returns>A <see cref="string"/> containing a unique inbox subject.</returns>
string NewInbox();

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
Expand Down Expand Up @@ -163,6 +89,91 @@ ValueTask<NatsMsg<TReply>> RequestAsync<TReply>(
INatsDeserialize<TReply>? replySerializer = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
}

public interface INatsConnection : INatsClient
{
event AsyncEventHandler<NatsEventArgs>? ConnectionDisconnected;

event AsyncEventHandler<NatsEventArgs>? ConnectionOpened;

event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

INatsServerInfo? ServerInfo { get; }

NatsOpts Opts { get; }

NatsConnectionState ConnectionState { get; }

NatsHeaderParser HeaderParser { get; }

SubscriptionManager SubscriptionManager { get; }

/// <summary>
/// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT).
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous round trip operation.</returns>
ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Publishes an empty message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action
/// or indicate an event for example and of messages.
/// </remarks>
ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg{T}"/> representing message details.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group
/// and returns a <see cref="INatsSub{T}"/> object which provides more control over the subscription.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>An asynchronous task that completes with the NATS subscription.</returns>
/// <remarks>
/// <para>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </para>
/// <para>
/// This method returns a <see cref="INatsSub{T}"/> object which provides slightly lower level
/// control over the subscription. You can use this object to create your own core messaging
/// patterns or to create your own higher level abstractions.
/// </para>
/// </remarks>
ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
/// </summary>
/// <returns>A <see cref="string"/> containing a unique inbox subject.</returns>
string NewInbox();

/// <summary>
/// Request and receive zero or more replies from a responder.
Expand Down Expand Up @@ -196,4 +207,20 @@ IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply>(
/// Connect socket and write CONNECT command to nats server.
/// </summary>
ValueTask ConnectAsync();

ValueTask<NatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
INatsSerialize<TRequest>? requestSerializer = default,
INatsDeserialize<TReply>? replySerializer = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default);

void OnMessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg);

BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts);
}
Loading
Loading