diff --git a/ExampleApp/ExampleApp.csproj b/ExampleApp/ExampleApp.csproj new file mode 100644 index 000000000..df4d8156d --- /dev/null +++ b/ExampleApp/ExampleApp.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + diff --git a/ExampleApp/Program.cs b/ExampleApp/Program.cs new file mode 100644 index 000000000..da3e45d50 --- /dev/null +++ b/ExampleApp/Program.cs @@ -0,0 +1,100 @@ +// ReSharper disable AccessToDisposedClosure + +using System.Text; +using NATS.Client; +using NATS.Client.JetStream; + +await using var client = new NatsClient(); + +CancellationTokenSource cts = new(); + +// Subscribe for int, string, bytes, json +List tasks = +[ + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.int", cancellationToken: cts.Token)) + { + Console.WriteLine($"Received int: {msg.Data}"); + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.string", cancellationToken: cts.Token)) + { + Console.WriteLine($"Received string: {msg.Data}"); + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.bytes", cancellationToken: cts.Token)) + { + if (msg.Data != null) + { + Console.WriteLine($"Received bytes: {Encoding.UTF8.GetString(msg.Data)}"); + } + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.json", cancellationToken: cts.Token)) + { + Console.WriteLine($"Received data: {msg.Data}"); + } + }), + + Task.Run(async () => + { + await foreach (var msg in client.SubscribeAsync("x.service", cancellationToken: cts.Token)) + { + if (msg.Data != null) + { + Console.WriteLine($"Replying to data: {msg.Data}"); + await msg.ReplyAsync($"Thank you {msg.Data.Name} your Id is {msg.Data.Id}!"); + } + } + }), + + Task.Run(async () => + { + var id = 0; + await foreach (var msg in client.SubscribeAsync("x.service2", cancellationToken: cts.Token)) + { + await msg.ReplyAsync(new MyData(id++, $"foo{id}")); + } + }) +]; + +await Task.Delay(1000); + +await client.PublishAsync("x.int", 100); +await client.PublishAsync("x.string", "Hello, World!"); +await client.PublishAsync("x.bytes", new byte[] { 65, 66, 67 }); +await client.PublishAsync("x.json", new MyData(30, "bar")); + +// Request/Reply +{ + var response = await client.RequestAsync("x.service", new MyData(100, "foo")); + Console.WriteLine($"Response: {response.Data}"); +} + +// Request/Reply without request data +for (var i = 0; i < 3; i++) +{ + var response = await client.RequestAsync("x.service2"); + Console.WriteLine($"Response[{i}]: {response.Data}"); +} + +// Use JetStream by referencing NATS.Client.JetStream pacakge +var js = client.GetJetStream(); + +await cts.CancelAsync(); + +await Task.WhenAll(tasks); + +Console.WriteLine("Bye!"); + +public record MyData(int Id, string Name); diff --git a/NATS.Client.sln b/NATS.Client.sln index 5779d3931..11e6babc5 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -109,6 +109,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Platform.Windows.Tests", "tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj", "{A37994CC-A23A-415E-8B61-9468C7178A55}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client", "src\NATS.Client\NATS.Client.csproj", "{76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExampleApp", "ExampleApp\ExampleApp.csproj", "{D2B45676-57FB-4088-8A2D-4B188BF1319C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -291,6 +295,14 @@ Global {A37994CC-A23A-415E-8B61-9468C7178A55}.Debug|Any CPU.Build.0 = Debug|Any CPU {A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.ActiveCfg = Release|Any CPU {A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.Build.0 = Release|Any CPU + {76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D}.Release|Any CPU.Build.0 = Release|Any CPU + {D2B45676-57FB-4088-8A2D-4B188BF1319C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D2B45676-57FB-4088-8A2D-4B188BF1319C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D2B45676-57FB-4088-8A2D-4B188BF1319C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D2B45676-57FB-4088-8A2D-4B188BF1319C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -341,6 +353,7 @@ Global {474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30} {B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {A37994CC-A23A-415E-8B61-9468C7178A55} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {76FDCE08-BDE5-4C78-AD24-2A68C7AEDC4D} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/Example.NativeAot/Program.cs b/sandbox/Example.NativeAot/Program.cs index 6eb00b803..0bb978429 100644 --- a/sandbox/Example.NativeAot/Program.cs +++ b/sandbox/Example.NativeAot/Program.cs @@ -222,6 +222,8 @@ public void Serialize(IBufferWriter bufferWriter, T value) throw new NatsException($"Can't deserialize {typeof(T)}"); } + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public record MyData diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index dcea8d688..56ab5836e 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -16,7 +16,7 @@ namespace NATS.Client.Core.Commands; /// These methods are in the hot path, and have all been /// optimized to eliminate allocations and minimize copying /// -internal sealed class CommandWriter : IAsyncDisposable +public sealed class CommandWriter : IAsyncDisposable { // memory segment used to consolidate multiple small memory chunks // 8520 should fit into 6 packets on 1500 MTU TLS connection or 1 packet on 9000 MTU TLS connection diff --git a/src/NATS.Client.Core/Commands/PingCommand.cs b/src/NATS.Client.Core/Commands/PingCommand.cs index a8c6236d4..014d75f80 100644 --- a/src/NATS.Client.Core/Commands/PingCommand.cs +++ b/src/NATS.Client.Core/Commands/PingCommand.cs @@ -3,7 +3,7 @@ namespace NATS.Client.Core.Commands; -internal class PingCommand : IValueTaskSource, IObjectPoolNode +public class PingCommand : IValueTaskSource, IObjectPoolNode { private readonly ObjectPool? _pool; private DateTimeOffset _start; diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index f83a49580..81991523a 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -1,29 +1,12 @@ using System.Diagnostics.CodeAnalysis; +using System.Threading.Channels; +using NATS.Client.Core.Internal; namespace NATS.Client.Core; -public interface INatsConnection : IAsyncDisposable +public interface INatsClient : IAsyncDisposable { - event AsyncEventHandler? ConnectionDisconnected; - - event AsyncEventHandler? ConnectionOpened; - - event AsyncEventHandler? ReconnectFailed; - - event AsyncEventHandler? MessageDropped; - - INatsServerInfo? ServerInfo { get; } - - NatsOpts Opts { get; } - - NatsConnectionState ConnectionState { get; } - - /// - /// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT). - /// - /// A used to cancel the command. - /// A that represents the asynchronous round trip operation. - ValueTask PingAsync(CancellationToken cancellationToken = default); + INatsConnection Connection { get; } /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. @@ -39,32 +22,6 @@ public interface INatsConnection : IAsyncDisposable /// A that represents the asynchronous send operation. ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// - /// Publishes an empty message payload to the given subject name, optionally supplying a reply subject. - /// - /// The destination subject to publish to. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action - /// or indicate an event for example and of messages. - /// - ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - - /// - /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. - /// - /// A representing message details. - /// Serializer to use for the message type. - /// A for publishing options. - /// A used to cancel the command. - /// Specifies the type of data that may be sent to the NATS Server. - /// A that represents the asynchronous send operation. - ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// @@ -82,37 +39,6 @@ public interface INatsConnection : IAsyncDisposable /// IAsyncEnumerable> SubscribeAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); - /// - /// Initiates a subscription to a subject, optionally joining a distributed queue group - /// and returns a object which provides more control over the subscription. - /// - /// The subject name to subscribe to. - /// If specified, the subscriber will join this queue group. - /// Serializer to use for the message type. - /// A for subscription options. - /// A used to cancel the command. - /// Specifies the type of data that may be received from the NATS Server. - /// An asynchronous task that completes with the NATS subscription. - /// - /// - /// Subscribers with the same queue group name, become a queue group, - /// and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - /// - /// This method returns a object which provides slightly lower level - /// control over the subscription. You can use this object to create your own core messaging - /// patterns or to create your own higher level abstractions. - /// - /// - ValueTask> SubscribeCoreAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); - - /// - /// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID} - /// - /// A containing a unique inbox subject. - string NewInbox(); - /// /// Request and receive a single reply from a responder. /// @@ -163,6 +89,91 @@ ValueTask> RequestAsync( INatsDeserialize? replySerializer = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); +} + +public interface INatsConnection : INatsClient +{ + event AsyncEventHandler? ConnectionDisconnected; + + event AsyncEventHandler? ConnectionOpened; + + event AsyncEventHandler? ReconnectFailed; + + event AsyncEventHandler? MessageDropped; + + INatsServerInfo? ServerInfo { get; } + + NatsOpts Opts { get; } + + NatsConnectionState ConnectionState { get; } + + NatsHeaderParser HeaderParser { get; } + + SubscriptionManager SubscriptionManager { get; } + + /// + /// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT). + /// + /// A used to cancel the command. + /// A that represents the asynchronous round trip operation. + ValueTask PingAsync(CancellationToken cancellationToken = default); + + /// + /// Publishes an empty message payload to the given subject name, optionally supplying a reply subject. + /// + /// The destination subject to publish to. + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action + /// or indicate an event for example and of messages. + /// + ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. + /// + /// A representing message details. + /// Serializer to use for the message type. + /// A for publishing options. + /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. + /// A that represents the asynchronous send operation. + ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Initiates a subscription to a subject, optionally joining a distributed queue group + /// and returns a object which provides more control over the subscription. + /// + /// The subject name to subscribe to. + /// If specified, the subscriber will join this queue group. + /// Serializer to use for the message type. + /// A for subscription options. + /// A used to cancel the command. + /// Specifies the type of data that may be received from the NATS Server. + /// An asynchronous task that completes with the NATS subscription. + /// + /// + /// Subscribers with the same queue group name, become a queue group, + /// and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + /// + /// This method returns a object which provides slightly lower level + /// control over the subscription. You can use this object to create your own core messaging + /// patterns or to create your own higher level abstractions. + /// + /// + ValueTask> SubscribeCoreAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID} + /// + /// A containing a unique inbox subject. + string NewInbox(); /// /// Request and receive zero or more replies from a responder. @@ -196,4 +207,20 @@ IAsyncEnumerable> RequestManyAsync( /// Connect socket and write CONNECT command to nats server. /// ValueTask ConnectAsync(); + + ValueTask> RequestSubAsync( + string subject, + TRequest? data, + NatsHeaders? headers = default, + INatsSerialize? requestSerializer = default, + INatsDeserialize? replySerializer = default, + NatsPubOpts? requestOpts = default, + NatsSubOpts? replyOpts = default, + CancellationToken cancellationToken = default); + + ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default); + + void OnMessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg); + + BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts); } diff --git a/src/NATS.Client.Core/INatsSerialize.cs b/src/NATS.Client.Core/INatsSerialize.cs index ddb055532..99f3ff640 100644 --- a/src/NATS.Client.Core/INatsSerialize.cs +++ b/src/NATS.Client.Core/INatsSerialize.cs @@ -14,6 +14,7 @@ namespace NATS.Client.Core; /// Serialized object type public interface INatsSerializer : INatsSerialize, INatsDeserialize { + INatsSerializer CombineWith(INatsSerializer next); } /// @@ -72,6 +73,32 @@ public class NatsDefaultSerializerRegistry : INatsSerializerRegistry public INatsDeserialize GetDeserializer() => NatsDefaultSerializer.Default; } +public class NatsSerializerBuilder +{ + private readonly List> _serializers = new(); + + public NatsSerializerBuilder Add(INatsSerializer serializer) + { + _serializers.Add(serializer); + return this; + } + + public INatsSerializer Build() + { + if (_serializers.Count == 0) + { + return NatsDefaultSerializer.Default; + } + + for (var i = _serializers.Count - 1; i > 0; i--) + { + _serializers[i - 1] = _serializers[i - 1].CombineWith(_serializers[i]); + } + + return _serializers[0]; + } +} + /// /// UTF8 serializer for strings and all the primitives. /// @@ -90,7 +117,9 @@ public class NatsUtf8PrimitivesSerializer : INatsSerializer /// Creates a new instance of . /// /// The next serializer in chain. - public NatsUtf8PrimitivesSerializer(INatsSerializer? next) => _next = next; + public NatsUtf8PrimitivesSerializer(INatsSerializer? next = default) => _next = next; + + public INatsSerializer CombineWith(INatsSerializer? next) => new NatsUtf8PrimitivesSerializer(next); /// public void Serialize(IBufferWriter bufferWriter, T value) @@ -595,7 +624,9 @@ public class NatsRawSerializer : INatsSerializer /// Creates a new instance of . /// /// Next serializer in chain. - public NatsRawSerializer(INatsSerializer? next) => _next = next; + public NatsRawSerializer(INatsSerializer? next = default) => _next = next; + + public INatsSerializer CombineWith(INatsSerializer? next) => new NatsRawSerializer(next); /// public void Serialize(IBufferWriter bufferWriter, T value) @@ -747,6 +778,8 @@ public NatsJsonContextSerializer(JsonSerializerContext context, INatsSerializer< { } + public INatsSerializer CombineWith(INatsSerializer next) => new NatsJsonContextSerializer(_contexts, next); + /// public void Serialize(IBufferWriter bufferWriter, T value) { diff --git a/src/NATS.Client.Core/Internal/ClientOpts.cs b/src/NATS.Client.Core/Internal/ClientOpts.cs index 9233534a9..fdbac28d2 100644 --- a/src/NATS.Client.Core/Internal/ClientOpts.cs +++ b/src/NATS.Client.Core/Internal/ClientOpts.cs @@ -9,7 +9,7 @@ namespace NATS.Client.Core.Internal; // These connections options are serialized and sent to the server. // https://github.com/nats-io/nats-server/blob/a23b1b7/server/client.go#L536 -internal sealed class ClientOpts +public sealed class ClientOpts { private ClientOpts(NatsOpts opts) { diff --git a/src/NATS.Client.Core/Internal/ISocketConnection.cs b/src/NATS.Client.Core/Internal/ISocketConnection.cs index 6b7f7fecc..2575899df 100644 --- a/src/NATS.Client.Core/Internal/ISocketConnection.cs +++ b/src/NATS.Client.Core/Internal/ISocketConnection.cs @@ -1,6 +1,6 @@ namespace NATS.Client.Core.Internal; -internal interface ISocketConnection : IAsyncDisposable +public interface ISocketConnection : IAsyncDisposable { public Task WaitForClosed { get; } diff --git a/src/NATS.Client.Core/Internal/ObjectPool.cs b/src/NATS.Client.Core/Internal/ObjectPool.cs index 0658294f2..42f3978ab 100644 --- a/src/NATS.Client.Core/Internal/ObjectPool.cs +++ b/src/NATS.Client.Core/Internal/ObjectPool.cs @@ -3,12 +3,12 @@ namespace NATS.Client.Core.Internal; -internal interface IObjectPoolNode +public interface IObjectPoolNode { ref T? NextNode { get; } } -internal sealed class ObjectPool +public sealed class ObjectPool { private static int typeId = -1; // Increment by IdentityGenerator diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index 872423324..465466a29 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -6,7 +6,7 @@ namespace NATS.Client.Core.Internal; -internal interface ISubscriptionManager +public interface ISubscriptionManager { public ValueTask RemoveAsync(NatsSubBase sub); } @@ -15,7 +15,7 @@ internal record struct SidMetadata(string Subject, WeakReference We internal sealed record SubscriptionMetadata(int Sid); -internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable +public sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable { private readonly ILogger _logger; private readonly object _gate = new(); diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index 9cbcf23e7..34ac3aefc 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -2,7 +2,7 @@ namespace NATS.Client.Core; public partial class NatsConnection { - internal ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) => + public ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) => ConnectionState != NatsConnectionState.Open ? ConnectAndSubAsync(sub, cancellationToken) : SubscriptionManager.SubscribeAsync(sub, cancellationToken); diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index a3f94c24f..a80d26bc6 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -2,7 +2,7 @@ namespace NATS.Client.Core; public partial class NatsConnection { - internal async ValueTask> RequestSubAsync( + public async ValueTask> RequestSubAsync( string subject, TRequest? data, NatsHeaders? headers = default, diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index b2e1d3ef4..d3546c541 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -120,18 +120,20 @@ public NatsConnectionState ConnectionState private set => Interlocked.Exchange(ref _connectionState, (int)value); } + public INatsConnection Connection => this; + public INatsServerInfo? ServerInfo => WritableServerInfo; // server info is set when received INFO + public NatsHeaderParser HeaderParser { get; } + + public SubscriptionManager SubscriptionManager { get; } + internal bool IsDisposed { get => Interlocked.CompareExchange(ref _isDisposed, 0, 0) == 1; private set => Interlocked.Exchange(ref _isDisposed, value ? 1 : 0); } - internal NatsHeaderParser HeaderParser { get; } - - internal SubscriptionManager SubscriptionManager { get; } - internal CommandWriter CommandWriter { get; } internal string InboxPrefix { get; } @@ -174,6 +176,34 @@ public async ValueTask ConnectAsync() await InitialConnectAsync().ConfigureAwait(false); } + public void OnMessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) + { + var subject = msg.Subject; + _logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending); + _eventChannel.Writer.TryWrite((NatsEvent.MessageDropped, new NatsMessageDroppedEventArgs(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data))); + } + + public BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts) + { + if (subChannelOpts is { } overrideOpts) + { + return new BoundedChannelOptions(overrideOpts.Capacity ?? + _defaultSubscriptionChannelOpts.Capacity) + { + AllowSynchronousContinuations = + _defaultSubscriptionChannelOpts.AllowSynchronousContinuations, + FullMode = + overrideOpts.FullMode ?? _defaultSubscriptionChannelOpts.FullMode, + SingleWriter = _defaultSubscriptionChannelOpts.SingleWriter, + SingleReader = _defaultSubscriptionChannelOpts.SingleReader, + }; + } + else + { + return _defaultSubscriptionChannelOpts; + } + } + public virtual async ValueTask DisposeAsync() { if (!IsDisposed) @@ -250,34 +280,6 @@ internal ValueTask UnsubscribeAsync(int sid) return default; } - internal void OnMessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) - { - var subject = msg.Subject; - _logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending); - _eventChannel.Writer.TryWrite((NatsEvent.MessageDropped, new NatsMessageDroppedEventArgs(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data))); - } - - internal BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts) - { - if (subChannelOpts is { } overrideOpts) - { - return new BoundedChannelOptions(overrideOpts.Capacity ?? - _defaultSubscriptionChannelOpts.Capacity) - { - AllowSynchronousContinuations = - _defaultSubscriptionChannelOpts.AllowSynchronousContinuations, - FullMode = - overrideOpts.FullMode ?? _defaultSubscriptionChannelOpts.FullMode, - SingleWriter = _defaultSubscriptionChannelOpts.SingleWriter, - SingleReader = _defaultSubscriptionChannelOpts.SingleReader, - }; - } - else - { - return _defaultSubscriptionChannelOpts; - } - } - private async ValueTask InitialConnectAsync() { Debug.Assert(ConnectionState == NatsConnectionState.Connecting, "Connection state"); diff --git a/src/NATS.Client.Core/NatsStats.cs b/src/NATS.Client.Core/NatsStats.cs index 403a75a08..14d3bc11d 100644 --- a/src/NATS.Client.Core/NatsStats.cs +++ b/src/NATS.Client.Core/NatsStats.cs @@ -9,7 +9,7 @@ public readonly record struct NatsStats long ReceivedMessages, long SubscriptionCount); -internal sealed class ConnectionStatsCounter +public sealed class ConnectionStatsCounter { // for operate Interlocked.Increment/Decrement/Add, expose field as public #pragma warning disable SA1401 diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index d0bdf318a..18fce1808 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -47,7 +47,7 @@ public abstract class NatsSubBase private Exception? _exception; internal NatsSubBase( - NatsConnection connection, + INatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, @@ -141,7 +141,7 @@ internal NatsSubBase( internal NatsSubOpts? Opts { get; private set; } - protected NatsConnection Connection { get; } + protected INatsConnection Connection { get; } public virtual ValueTask ReadyAsync() { diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 785017167..edad8eb05 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -52,7 +52,7 @@ internal class NatsJSOrderedPushConsumer private readonly NatsJSOrderedPushConsumerOpts _opts; private readonly NatsSubOpts? _subOpts; private readonly CancellationToken _cancellationToken; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly Channel> _commandChannel; private readonly Channel> _msgChannel; private readonly Channel _consumerCreateChannel; @@ -409,7 +409,7 @@ internal class NatsJSOrderedPushConsumerSub : NatsSubBase { private readonly NatsJSContext _context; private readonly CancellationToken _cancellationToken; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; private readonly INatsDeserialize _serializer; private readonly ChannelWriter> _commands; diff --git a/src/NATS.Client.JetStream/Internal/netstandard.cs b/src/NATS.Client.JetStream/Internal/netstandard_js.cs similarity index 100% rename from src/NATS.Client.JetStream/Internal/netstandard.cs rename to src/NATS.Client.JetStream/Internal/netstandard_js.cs diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 1c46b0e43..25ac0944e 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -7,13 +7,18 @@ namespace NATS.Client.JetStream; +public static class NatsClientExtensions +{ + public static INatsJSContext GetJetStream(this INatsClient connection) => new NatsJSContext((NatsConnection)connection.Connection); +} + /// Provides management and access to NATS JetStream streams and consumers. public partial class NatsJSContext { private readonly ILogger _logger; - /// > - public NatsJSContext(NatsConnection connection) + /// > + public NatsJSContext(INatsConnection connection) : this(connection, new NatsJSOpts(connection.Opts)) { } @@ -23,14 +28,14 @@ public NatsJSContext(NatsConnection connection) /// /// A NATS server connection to access the JetStream APIs, publishers and consumers. /// Context wide JetStream options. - public NatsJSContext(NatsConnection connection, NatsJSOpts opts) + public NatsJSContext(INatsConnection connection, NatsJSOpts opts) { Connection = connection; Opts = opts; _logger = connection.Opts.LoggerFactory.CreateLogger(); } - internal NatsConnection Connection { get; } + internal INatsConnection Connection { get; } internal NatsJSOpts Opts { get; } diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index fb0d2c7eb..e2f7f1d3b 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -9,7 +9,7 @@ internal class NatsKVWatchSub : NatsSubBase { private readonly NatsJSContext _context; private readonly CancellationToken _cancellationToken; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; private readonly INatsDeserialize _serializer; private readonly ChannelWriter> _commands; diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 036d19965..d39f0fd8c 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -36,7 +36,7 @@ internal class NatsKVWatcher : IAsyncDisposable private readonly CancellationToken _cancellationToken; private readonly string _keyBase; private readonly string[] _filters; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly Channel> _commandChannel; private readonly Channel> _entryChannel; private readonly Channel _consumerCreateChannel; diff --git a/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs b/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs index 9aa6b0b8d..5df90cde2 100644 --- a/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs +++ b/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs @@ -11,7 +11,7 @@ namespace NATS.Client.Serializers.Json; /// /// This serializer is not suitable for native AOT deployments since it might rely on reflection /// -public sealed class NatsJsonSerializer : INatsSerialize, INatsDeserialize +public sealed class NatsJsonSerializer : INatsSerializer { private static readonly JsonWriterOptions JsonWriterOpts = new() { Indented = false, SkipValidation = true, }; @@ -20,6 +20,11 @@ public sealed class NatsJsonSerializer : INatsSerialize, INatsDeserialize< private readonly JsonSerializerOptions _opts; + public NatsJsonSerializer() + : this(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }) + { + } + /// /// Creates a new instance of with the specified options. /// @@ -29,7 +34,10 @@ public sealed class NatsJsonSerializer : INatsSerialize, INatsDeserialize< /// /// Default instance of with option set to ignore null values when writing. /// - public static NatsJsonSerializer Default { get; } = new(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }); + public static NatsJsonSerializer Default { get; } = new(); + + public INatsSerializer CombineWith(INatsSerializer next) + => throw new NotSupportedException(); /// flush public void Serialize(IBufferWriter bufferWriter, T? value) diff --git a/src/NATS.Client/NATS.Client.csproj b/src/NATS.Client/NATS.Client.csproj new file mode 100644 index 000000000..c444dcbbc --- /dev/null +++ b/src/NATS.Client/NATS.Client.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + + + + + + + + diff --git a/src/NATS.Client/NatsClient.cs b/src/NATS.Client/NatsClient.cs new file mode 100644 index 000000000..1f521a00f --- /dev/null +++ b/src/NATS.Client/NatsClient.cs @@ -0,0 +1,48 @@ +using System.Threading.Channels; +using NATS.Client.Core; + +namespace NATS.Client; + +public class NatsClient : INatsClient +{ + public NatsClient( + string name = "NATS .NET Client", + string url = "nats://localhost:4222", + string? credsFile = default) + { + var opts = new NatsOpts + { + Url = url, + SerializerRegistry = NatsClientDefaultSerializerRegistry.Default, + SubPendingChannelFullMode = BoundedChannelFullMode.Wait, + AuthOpts = new NatsAuthOpts { CredsFile = credsFile }, + }; + + Connection = new NatsConnection(opts); + } + + public NatsClient(INatsConnection connection) => Connection = connection; + + /// + public INatsConnection Connection { get; } + + /// + public ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + => Connection.PublishAsync(subject, data, headers, replyTo, serializer, opts, cancellationToken); + + /// + public IAsyncEnumerable> SubscribeAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + => Connection.SubscribeAsync(subject, queueGroup, serializer, opts, cancellationToken); + + /// + public ValueTask> RequestAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) + => Connection.RequestAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken); + + /// + public ValueTask> RequestAsync(string subject, INatsDeserialize? replySerializer = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) + => Connection.RequestAsync(subject, replySerializer, replyOpts, cancellationToken); + + /// + public ValueTask DisposeAsync() + => Connection.DisposeAsync(); +} diff --git a/src/NATS.Client/NatsClientDefaultSerializer.cs b/src/NATS.Client/NatsClientDefaultSerializer.cs new file mode 100644 index 000000000..543d8c315 --- /dev/null +++ b/src/NATS.Client/NatsClientDefaultSerializer.cs @@ -0,0 +1,27 @@ +using NATS.Client.Core; +using NATS.Client.Serializers.Json; + +namespace NATS.Client; + +public static class NatsClientDefaultSerializer +{ + public static readonly INatsSerializer Default; + + static NatsClientDefaultSerializer() + { + Default = new NatsSerializerBuilder() + .Add(new NatsRawSerializer()) + .Add(new NatsUtf8PrimitivesSerializer()) + .Add(new NatsJsonSerializer()) + .Build(); + } +} + +public class NatsClientDefaultSerializerRegistry : INatsSerializerRegistry +{ + public static readonly NatsClientDefaultSerializerRegistry Default = new(); + + public INatsSerialize GetSerializer() => NatsClientDefaultSerializer.Default; + + public INatsDeserialize GetDeserializer() => NatsClientDefaultSerializer.Default; +} diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index 45f3aeb81..d594213a5 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -299,6 +299,8 @@ public class TestSerializer : INatsSerializer public void Serialize(IBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public class TestSerializerException : Exception; @@ -310,6 +312,8 @@ public class TestSerializerWithEmpty : INatsSerializer : new TestData(Encoding.ASCII.GetString(buffer))); public void Serialize(IBufferWriter bufferWriter, T value) => throw new Exception("not used"); + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public record TestData(string Name); diff --git a/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs b/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs index 4b3e89f5d..e93d3345d 100644 --- a/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs @@ -67,6 +67,8 @@ public void Serialize(IBufferWriter bufferWriter, T value) } public T Deserialize(in ReadOnlySequence buffer) => (T)(object)new byte[] { 42 }; + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } private class Level42SerializerRegistry : INatsSerializerRegistry diff --git a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs index c70453892..e5de91c60 100644 --- a/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs +++ b/tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs @@ -1,3 +1,4 @@ +using System.Threading.Channels; using NATS.Client.Core.Tests; namespace NATS.Client.JetStream.Tests; @@ -98,6 +99,12 @@ public class MockConnection : INatsConnection public NatsConnectionState ConnectionState { get; } = NatsConnectionState.Closed; + public NatsHeaderParser HeaderParser { get; } + + public SubscriptionManager SubscriptionManager { get; } + + public INatsConnection Connection { get; } + public ValueTask PingAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); @@ -138,6 +145,15 @@ public IAsyncEnumerable> RequestManyAsync( public ValueTask ConnectAsync() => throw new NotImplementedException(); + public ValueTask> RequestSubAsync(string subject, TRequest? data, NatsHeaders? headers = default, INatsSerialize? requestSerializer = default, INatsDeserialize? replySerializer = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + public ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public void OnMessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) => throw new NotImplementedException(); + + public BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts) => throw new NotImplementedException(); + public ValueTask DisposeAsync() => throw new NotImplementedException(); } } diff --git a/tests/NATS.Net.DocsExamples/SerializationPage.cs b/tests/NATS.Net.DocsExamples/SerializationPage.cs index f25015ac0..f4e3dbcd2 100644 --- a/tests/NATS.Net.DocsExamples/SerializationPage.cs +++ b/tests/NATS.Net.DocsExamples/SerializationPage.cs @@ -297,6 +297,8 @@ public void Serialize(IBufferWriter bufferWriter, T value) throw new NatsException($"Can't deserialize {typeof(T)}"); } + + public INatsSerializer CombineWith(INatsSerializer next) => throw new NotImplementedException(); } public class MyProtoBufSerializerRegistry : INatsSerializerRegistry