Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JetStream NATS Client Extensions #598

Merged
merged 14 commits into from
Sep 18, 2024
1 change: 1 addition & 0 deletions sandbox/Example.Client/Example.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.JetStream\NATS.Client.JetStream.csproj" />
<ProjectReference Include="..\..\src\NATS.Client\NATS.Client.csproj" />
</ItemGroup>

Expand Down
8 changes: 7 additions & 1 deletion sandbox/Example.Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using System.Text;
using NATS.Client;
using NATS.Client.JetStream;

CancellationTokenSource cts = new();

Expand Down Expand Up @@ -88,7 +89,12 @@
}

// Use JetStream by referencing NATS.Client.JetStream package
// var js = client.GetJetStream();
var js = client.CreateJetStreamContext();
await foreach (var stream in js.ListStreamsAsync())
{
Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}");
}

await cts.CancelAsync();

await Task.WhenAll(tasks);
Expand Down
86 changes: 86 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,56 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;

namespace NATS.Client.Core;

public interface INatsConnection : INatsClient
{
/// <summary>
/// Event that is raised when the connection to the NATS server is disconnected.
/// </summary>
event AsyncEventHandler<NatsEventArgs>? ConnectionDisconnected;

/// <summary>
/// Event that is raised when the connection to the NATS server is opened.
/// </summary>
event AsyncEventHandler<NatsEventArgs>? ConnectionOpened;

/// <summary>
/// Event that is raised when a reconnect attempt is failed.
/// </summary>
event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

/// <summary>
/// Event that is raised when a message is dropped for a subscription.
/// </summary>
event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

/// <summary>
/// Server information received from the NATS server.
/// </summary>
INatsServerInfo? ServerInfo { get; }

/// <summary>
/// Options used to configure the NATS connection.
/// </summary>
NatsOpts Opts { get; }

/// <summary>
/// Connection state of the NATS connection.
/// </summary>
NatsConnectionState ConnectionState { get; }

/// <summary>
/// Subscription manager used to manage subscriptions for the NATS connection.
/// </summary>
INatsSubscriptionManager SubscriptionManager { get; }

/// <summary>
/// Singleton instance of the NATS header parser used to parse message headers
/// used by the NATS connection.
/// </summary>
NatsHeaderParser HeaderParser { get; }

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
Expand Down Expand Up @@ -87,4 +120,57 @@ IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply>(
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Adds a subscription to the NATS connection for a given <see cref="NatsSubBase"/> object.
/// Subscriptions are managed by the connection and are automatically removed when the connection is closed.
/// </summary>
/// <param name="sub">The <see cref="NatsSubBase"/> object representing the subscription details.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous subscription operation.</returns>
ValueTask AddSubAsync(NatsSubBase sub, CancellationToken cancellationToken = default);

/// <summary>
/// Creates a subscription with appropriate request and reply subjects publishing the request.
/// It's the caller's responsibility to retrieve the reply messages and complete the subscription.
/// </summary>
/// <typeparam name="TRequest">The type of the request data.</typeparam>
/// <typeparam name="TReply">The type of the expected reply.</typeparam>
/// <param name="subject">The subject to subscribe to.</param>
/// <param name="data">The optional request data.</param>
/// <param name="headers">The optional headers to include with the request.</param>
/// <param name="requestSerializer">The optional serializer for the request data.</param>
/// <param name="replySerializer">The optional deserializer for the reply data.</param>
/// <param name="requestOpts">The optional publishing options for the request.</param>
/// <param name="replyOpts">The optional subscription options for the reply.</param>
/// <param name="cancellationToken">The optional cancellation token.</param>
/// <returns>A <see cref="ValueTask{T}"/> representing the asynchronous operation of creating the request subscription.</returns>
ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
INatsSerialize<TRequest>? requestSerializer = default,
INatsDeserialize<TReply>? replySerializer = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Retrieves the bounded channel options for creating a channel used by a subscription.
/// Options are built from the connection's configuration and the subscription channel options.
/// Used to aid in custom message handling when building a subscription channel.
/// </summary>
/// <param name="subChannelOpts">The options for configuring the subscription channel.</param>
/// <returns>The bounded channel options used for creating the subscription channel.</returns>
BoundedChannelOptions GetBoundedChannelOpts(NatsSubChannelOpts? subChannelOpts);

/// <summary>
/// Called when a message is dropped for a subscription.
/// Used to aid in custom message handling when a subscription's message channel is full.
/// </summary>
/// <param name="natsSub">The <see cref="NatsSubBase"/> representing the subscription.</param>
/// <param name="pending">The number of pending messages at the time the drop occurred.</param>
/// <param name="msg">The dropped message represented by <see cref="NatsMsg{T}"/>.</param>
/// <typeparam name="T">Specifies the type of data in the dropped message.</typeparam>
void OnMessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I had one 'thought' here...

maybe as <remarks> XmlDoc... explaining to people why this -isn't- a valuetask... e.x.

<remarks>This method is expected to complete quickly to avoid further delays in processing; if complex work is required, it is recommended to offload to a channel or other out-of-band processor</remarks>

but... as usual may be overthinking >_<

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is an important detail to mention

}
6 changes: 6 additions & 0 deletions src/NATS.Client.Core/INatsSubscriptionManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace NATS.Client.Core;

public interface INatsSubscriptionManager
{
public ValueTask RemoveAsync(NatsSubBase sub);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is public now should we XMLDoc it?

(If 'public but really internal' perhaps an XMLDoc stating it is an internal API subject to change?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

6 changes: 3 additions & 3 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public InboxSub(
string subject,
NatsSubOpts? opts,
NatsConnection connection,
ISubscriptionManager manager)
INatsSubscriptionManager manager)
: base(connection, manager, subject, queueGroup: default, opts)
{
_inbox = inbox;
Expand All @@ -35,7 +35,7 @@ protected override void TryComplete()
}
}

internal class InboxSubBuilder : ISubscriptionManager
internal class InboxSubBuilder : INatsSubscriptionManager
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost left a comment here but see that ISubscriptionManager was internal so it shouldn't break things for sane consumers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. I guess it will break binary compatibility but we're coming out with a few breaking changes here applications will need to rebuild anyway.

{
private readonly ILogger<InboxSubBuilder> _logger;
#if NETSTANDARD2_0
Expand All @@ -46,7 +46,7 @@ internal class InboxSubBuilder : ISubscriptionManager

public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;

public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager)
public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, INatsSubscriptionManager manager)
{
return new InboxSub(this, subject, opts, connection, manager);
}
Expand Down
11 changes: 3 additions & 8 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@

namespace NATS.Client.Core.Internal;

internal interface ISubscriptionManager
{
public ValueTask RemoveAsync(NatsSubBase sub);
}

internal record struct SidMetadata(string Subject, WeakReference<NatsSubBase> WeakReference);

internal sealed record SubscriptionMetadata(int Sid);

internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable
internal sealed class SubscriptionManager : INatsSubscriptionManager, IAsyncDisposable
{
private readonly ILogger<SubscriptionManager> _logger;
private readonly bool _trace;
Expand Down Expand Up @@ -192,7 +187,7 @@ public ValueTask RemoveAsync(NatsSubBase sub)
/// Commands returned form all the subscriptions will be run as a priority right after reconnection is established.
/// </remarks>
/// <returns>Enumerable list of commands</returns>
public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
{
if (_debug)
{
Expand Down Expand Up @@ -226,7 +221,7 @@ public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
}
}

public ISubscriptionManager GetManagerFor(string subject)
internal INatsSubscriptionManager GetManagerFor(string subject)
{
if (IsInboxSubject(subject))
return InboxSubBuilder;
Expand Down
7 changes: 4 additions & 3 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ namespace NATS.Client.Core;

public partial class NatsConnection
{
internal ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) =>
/// <inheritdoc />
public ValueTask AddSubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) =>
ConnectionState != NatsConnectionState.Open
? ConnectAndSubAsync(sub, cancellationToken)
: SubscriptionManager.SubscribeAsync(sub, cancellationToken);
: _subscriptionManager.SubscribeAsync(sub, cancellationToken);

private async ValueTask ConnectAndSubAsync(NatsSubBase sub, CancellationToken cancellationToken = default)
{
await ConnectAsync().AsTask().WaitAsync(cancellationToken).ConfigureAwait(false);
await SubscriptionManager.SubscribeAsync(sub, cancellationToken).ConfigureAwait(false);
await _subscriptionManager.SubscribeAsync(sub, cancellationToken).ConfigureAwait(false);
}
}
6 changes: 3 additions & 3 deletions src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
try
{
replyOpts = SetReplyOptsDefaults(replyOpts);
await using var sub1 = await RequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
await using var sub1 = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub1.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
Expand All @@ -56,7 +56,7 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
}

replyOpts = SetReplyOptsDefaults(replyOpts);
await using var sub = await RequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
await using var sub = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
Expand Down Expand Up @@ -95,7 +95,7 @@ public async IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
replyOpts = SetReplyManyOptsDefaults(replyOpts);
await using var sub = await RequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
await using var sub = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
Expand Down
7 changes: 4 additions & 3 deletions src/NATS.Client.Core/NatsConnection.RequestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ namespace NATS.Client.Core;

public partial class NatsConnection
{
internal async ValueTask<NatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
/// <inheritdoc />
public async ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
Expand All @@ -15,8 +16,8 @@ internal async ValueTask<NatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
var replyTo = NewInbox();

replySerializer ??= Opts.SerializerRegistry.GetDeserializer<TReply>();
var sub = new NatsSub<TReply>(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
var sub = new NatsSub<TReply>(this, _subscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);

requestSerializer ??= Opts.SerializerRegistry.GetSerializer<TRequest>();
await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false);
Expand Down
8 changes: 4 additions & 4 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();

await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
await using var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await AddSubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);

// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
Expand All @@ -25,8 +25,8 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
public async ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
return sub;
}
}
Loading
Loading