From fd1fd423c88cc202f4efbf9bdbcee8ffd246a37a Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 13 Sep 2023 16:33:15 +0100 Subject: [PATCH] JetStream documentation and minor fixes (#129) * Options tidy-up * Removed variable PubOpts Headers and ReplyTo * Added headers and replyTo params to Publish methods * Make all Opts classes (not structs) * Added default options for JS context * JetStream docs update * API docs update * JetStream exception docs * JetStream docs structure * JetStream in-line docs typo fix * JetStream documentation * JetStream index and readme updates * JetStream intro tidy-up * Fixed test * Added missing exception docs --- README.md | 72 +++------ docs/documentation/core/intro.md | 72 +++++++++ docs/documentation/{ => core}/pub-sub.md | 0 docs/documentation/{ => core}/queue.md | 0 docs/documentation/{ => core}/req-rep.md | 0 docs/documentation/intro.md | 10 +- docs/documentation/jetstream/consume.md | 108 ++++++++++++++ docs/documentation/jetstream/intro.md | 139 ++++++++++++++++++ docs/documentation/jetstream/manage.md | 57 +++++++ docs/documentation/jetstream/publish.md | 50 +++++++ docs/documentation/toc.yml | 25 +++- docs/index.md | 76 ++-------- .../Internal/SubscriptionManager.cs | 5 + src/NATS.Client.JetStream/INatsJSConsume.cs | 6 + src/NATS.Client.JetStream/INatsJSFetch.cs | 6 + .../{ => Internal}/NatsJSConsume.cs | 5 +- .../Internal/NatsJSExtensionsInternal.cs | 2 +- .../{ => Internal}/NatsJSFetch.cs | 5 +- .../{ => Internal}/NatsJSResponse.cs | 13 +- src/NATS.Client.JetStream/NatsJSConsumer.cs | 90 +++++++++++- .../NatsJSContext.Consumers.cs | 47 ++++++ .../NatsJSContext.Streams.cs | 52 +++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 55 +++++-- src/NATS.Client.JetStream/NatsJSException.cs | 63 ++++++++ src/NATS.Client.JetStream/NatsJSExtensions.cs | 7 + src/NATS.Client.JetStream/NatsJSMsg.cs | 58 ++++++++ .../NatsJSNotification.cs | 6 - src/NATS.Client.JetStream/NatsJSOpts.cs | 12 ++ src/NATS.Client.JetStream/NatsJSStream.cs | 89 ++++++++++- .../ConsumerConsumeTest.cs | 2 +- 30 files changed, 969 insertions(+), 163 deletions(-) create mode 100644 docs/documentation/core/intro.md rename docs/documentation/{ => core}/pub-sub.md (100%) rename docs/documentation/{ => core}/queue.md (100%) rename docs/documentation/{ => core}/req-rep.md (100%) create mode 100644 docs/documentation/jetstream/consume.md create mode 100644 docs/documentation/jetstream/intro.md create mode 100644 docs/documentation/jetstream/manage.md create mode 100644 docs/documentation/jetstream/publish.md rename src/NATS.Client.JetStream/{ => Internal}/NatsJSConsume.cs (98%) rename src/NATS.Client.JetStream/{ => Internal}/NatsJSFetch.cs (98%) rename src/NATS.Client.JetStream/{ => Internal}/NatsJSResponse.cs (71%) delete mode 100644 src/NATS.Client.JetStream/NatsJSNotification.cs diff --git a/README.md b/README.md index f063e863f..f06ed8a2a 100644 --- a/README.md +++ b/README.md @@ -1,67 +1,35 @@ # NATS.NET V2 +NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://dot.net/). + ## Preview -The NATS.NET V2 client is in preview and not recommended for production use. -Codebase is still under heavy development and currently we only have implementations for [core NATS](https://docs.nats.io/nats-concepts/core-nats) features. +The NATS.NET V2 client is in preview and not recommended for production use yet. +Codebase is still under heavy development and we currently implemented [Core NATS](https://docs.nats.io/nats-concepts/core-nats) +and basic [JetStream](https://docs.nats.io/nats-concepts/jetstream) features. + +Please test and provide feedback: + +* on [slack.nats.io dotnet channel](https://natsio.slack.com/channels/dotnet) +* or use GitHub discussions, issues and PRs + +Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help +supporting and developing NATS .NET V2 project. + +## Documentation -Please test and provide feedback by visiting our [Slack channel](https://natsio.slack.com/channels/dotnet). +Check out the [documentation](https://nats-io.github.io/nats.net.v2/) for guides and examples. ## NATS.NET V2 Goals -- Only support Async I/O -- Target latest .NET LTS Release (currently `net6.0`) +- Only support Async I/O (async/await) +- Target latest .NET LTS Release (currently .NET 6.0) ## Packages -- **NATS.Client.Core**: [core NATS](https://docs.nats.io/nats-concepts/core-nats) +- **NATS.Client.Core**: [Core NATS](https://docs.nats.io/nats-concepts/core-nats) - **NATS.Client.Hosting**: extension to configure DI container -- **NATS.Client.JetStream**: JetStream *not yet implemented* - -## Basic Usage - -[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen -on its default TCP port 4222. - -Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending -and receiving `Bar` objects: - -```csharp -public record Bar -{ - public int Id { get; set; } - public string Name { get; set; } -} -``` - -Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(options); - -await using sub = await nats.SubscribeAsync("bar.>"); -await foreach (var msg in sub.Msgs.ReadAllAsync()) -{ - Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); -} -``` - -Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -for (int i = 0; i < 10; i++) -{ - Console.WriteLine($" Publishing {i}..."); - await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); -} -``` - -You should also hook your logger to `NatsConnection` to make sure all is working as expected or -to get help diagnosing any issues you might have: -```csharp -var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; -await using var nats = new NatsConnection(options); -``` +- **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream) ## Contributing diff --git a/docs/documentation/core/intro.md b/docs/documentation/core/intro.md new file mode 100644 index 000000000..48904a44c --- /dev/null +++ b/docs/documentation/core/intro.md @@ -0,0 +1,72 @@ +# Core NATS + +[Core NATS](https://docs.nats.io/nats-concepts/core-nats) is the base set of functionalities and qualities of service +offered by a NATS service infrastructure. Core NATS is the foundation for JetStream and other services. For the sake +of explanation, in a simplified sense you can think of Core NATS as the +[wire protocol](https://docs.nats.io/reference/reference-protocols/nats-protocol) defining a simple but powerful +pub/sub functionality and the concept of [Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects). + +## Core NATS Quick Start + +[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen +on its default TCP port 4222. + +```shell +$ nats-server +``` + +Install `NATS.Client.Core` preview from Nuget. + +Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending +and receiving `Bar` objects: + +```csharp +public record Bar +{ + public int Id { get; set; } + public string Name { get; set; } +} +``` + +Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): + +```csharp +await using var nats = new NatsConnection(); + +await using sub = await nats.SubscribeAsync("bar.>"); +await foreach (var msg in sub.Msgs.ReadAllAsync()) +{ + Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); +} +``` + +Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects): +```csharp +await using var nats = new NatsConnection(); + +for (int i = 0; i < 10; i++) +{ + Console.WriteLine($" Publishing {i}..."); + await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); +} +``` + +## Logging + +You should also hook your logger to `NatsConnection` to make sure all is working as expected or +to get help diagnosing any issues you might have: + +```csharp +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +await using var nats = new NatsConnection(otps); +``` + +## What's Next + +[Publish-Subscribe](pub-sub.md) is the message distribution model for one-to-many communication. + +[Request-Reply](req-rep.md) is a common pattern in modern distributed systems. A request is sent, and the application +either waits on the response with a certain timeout, or receives a response asynchronously. + +[Queue Groups](queue.md) enables the 1:N fan-out pattern of messaging ensuring that any message sent by a publisher, +reaches all subscribers that have registered. diff --git a/docs/documentation/pub-sub.md b/docs/documentation/core/pub-sub.md similarity index 100% rename from docs/documentation/pub-sub.md rename to docs/documentation/core/pub-sub.md diff --git a/docs/documentation/queue.md b/docs/documentation/core/queue.md similarity index 100% rename from docs/documentation/queue.md rename to docs/documentation/core/queue.md diff --git a/docs/documentation/req-rep.md b/docs/documentation/core/req-rep.md similarity index 100% rename from docs/documentation/req-rep.md rename to docs/documentation/core/req-rep.md diff --git a/docs/documentation/intro.md b/docs/documentation/intro.md index 9e8ed980c..a44ce4111 100644 --- a/docs/documentation/intro.md +++ b/docs/documentation/intro.md @@ -1,9 +1,15 @@ # NATS.NET V2 Client -NATS.NET V2 Client is a .Net client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)! -It's build on top of the modern .Net 6+ platform, taking advantage of all the high performance features and +NATS.NET V2 Client is a .NET client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)! +It's build on top of the modern .NET platform, taking advantage of all the high performance features and asynchronous programming model. NATS.NET V2 Client, just like NATS, is Open Source as is this documentation. Please [let us know](https://natsio.slack.com/channels/dotnet) if you have updates and/or suggestions for these docs. You can also create a Pull Request using the Edit on GitHub link on each page. + +## What's Next + +[Core NATS](core/intro.md) is the base set of functionalities and qualities of service offered by a NATS service infrastructure. + +[JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary. diff --git a/docs/documentation/jetstream/consume.md b/docs/documentation/jetstream/consume.md new file mode 100644 index 000000000..fa505068e --- /dev/null +++ b/docs/documentation/jetstream/consume.md @@ -0,0 +1,108 @@ +# Consuming Messages from Streams + +Consuming messages from a stream can be done using one of three different methods depending on you application needs. +You can access these methods from the consumer object you can create using JetStream context: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); + +var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_processor"); +``` + +## Next Method + +Next method is the simplest was of retrieving messages from a stream. Every time you call the next method you get +a single message or nothing based on the expiry time to wait for a message. Once a message is received you can +process it and call next again for another. + +```csharp +while (!cancellationToken.IsCancellationRequested) +{ + var next = await consumer.NextAsync(); + + if (next is { } msg) + { + Console.WriteLine($"Processing {msg.Subject}: {msg.Data.OrderId}..."); + await msg.AckAsync(); + } +} +``` + +Next is the simplest and most conservative way of consuming messages since you request a single message from JetStream +server then acknowledge it before requesting more messages. However, next method is also the least performant since +there is not message batching. + +## Fetch Method + +Fetch method requests messages in batches to improve the performance while giving the application the control over how +fast it can process the messages without overwhelming the application process. + +```csharp +while (!cancellationToken.IsCancellationRequested) +{ + // Consume a batch of messages (1000 by default) + await foreach (var msg in consumer.FetchAllAsync()) + { + // Process message + await msg.AckAsync(); + } +} +``` + +## Consume Method + +Consume method is the most performant method of consuming messages. Request for messages (a.k.a. pull requests) are +interleaved so that there is a constant flow of messages from the JetStream server. Flow is controlled by `MaxMsgs` +or `MaxBytes` and respective thresholds not to overwhelm the application and not to waste server resources. + +```csharp +await foreach (var msg in consumer.ConsumeAllAsync()) +{ + // Process message + await msg.AckAsync(); + + // loop never exits unless there is an error or a break +} +``` + +## Handling Exceptions + +While consuming messages (using next, fetch or consume methods) there are several scenarios where exception might be +thrown by the client library, for example: + +* Consumer is deleted by another application or operator +* Connection to NATS server is interrupted (mainly for next and fetch methods, consume method can recover) +* Client request for the next batch is invalid +* Account permissions have changed +* Cluster leader changed + +A naive implementation might try to recover from errors assuming they are temporary e.g. the stream or the consumer +will be created eventually: + +```csharp +while (!cancellationToken.IsCancellationRequested) +{ + try + { + await consumer.RefreshAsync(); // or try to recreate consumer + await foreach (var msg in consumer.ConsumeAllAsync()) + { + // Process message + await msg.AckAsync(); + } + } + catch (NatsJSProtocolException e) + { + // log exception + } + catch (NatsJSException e) + { + // log exception + await Task.Delay(1000); // or back off + } +} +``` + +Depending on your application you should configure streams and consumers with appropriate settings so that the +messages are processed and stored based on your requirements. diff --git a/docs/documentation/jetstream/intro.md b/docs/documentation/jetstream/intro.md new file mode 100644 index 000000000..04147a19a --- /dev/null +++ b/docs/documentation/jetstream/intro.md @@ -0,0 +1,139 @@ +# JetStream + +[JetStream](https://docs.nats.io/nats-concepts/jetstream) is the built-in distributed persistence system which enables +new functionalities and higher qualities of service on top of the base _Core NATS_ functionalities and qualities of service. + +JetStream is built-in to nats-server and you only need 1 (or 3 or 5 if you want fault-tolerance against 1 or 2 +simultaneous NATS server failures) of your NATS server(s) to be JetStream enabled for it to be available to all the +client applications. + +JetStream can be enabled by running the server with `-js` flag e.g. `nats-server -js`. + +## Streaming: temporal decoupling between the publishers and subscribers + +One of the tenets of basic publish/subscribe messaging is that there is a required temporal coupling between the +publishers and the subscribers: subscribers only receive the messages that are published when they are actively +connected to the messaging system (i.e. they do not receive messages that are published while they are not subscribing +or not running or disconnected). + +Streams capture and store messages published on one (or more) subject and allow client applications to create +consumers at any time to 'replay' (or consume) all or some of the messages stored in the stream. + +Streams are message stores, each stream defines how messages are stored and what the limits (duration, size, interest) +of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured +in the defined storage system. + +A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a +stream and will keep track of which messages were delivered and acknowledged by clients. + +## JetStream Quick Start + +[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it with JetStream enabled: + +```shell +$ nats-server -js +``` + +Install `NATS.Client.JetStream` preview from Nuget. + +Before we can so anything, we need a JetStream context: + +```csharp +await using var nc = new NatsConnection(); +var js = new NatsJSContext(nc); +``` + +Let's create our stream first. In JetStream, a stream is simply a storage for messages: + +```csharp +await js.CreateStreamAsync(stream: "shop_orders", subjects: new []{"orders.>"}); +``` + +We can save messages in a stream by publishing them to the subjects the stream is interested in, which is `orders.>` in +our case, meaning any subject prefixed with `orders.` e.g. `orders.new.123`. Have a look at NATS documentation about +[wildcards in Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects#wildcards) for more information. + +Given that we have a record `Order`, we can publish and consume stream of `Order` objects: + +```csharp +public record Order(int OrderId); +``` + +We can publish to the `shop_orders` stream and receive a confirmation that our message is persisted: + +```csharp +for (var i = 0; i < 10; i++) +{ + // Notice we're using JetStream context to publish and receive ACKs + var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); + ack.EnsureSuccess(); +} +``` + +Now that we have a few messages in our stream, let's see its status using the [NATS command +line client](https://github.com/nats-io/natscli): + +```shell +$ nats stream ls +╭───────────────────────────────────────────────────────────────────────────────────╮ +│ Streams │ +├─────────────┬─────────────┬─────────────────────┬──────────┬───────┬──────────────┤ +│ Name │ Description │ Created │ Messages │ Size │ Last Message │ +├─────────────┼─────────────┼─────────────────────┼──────────┼───────┼──────────────┤ +│ shop_orders │ │ 2023-09-12 10:25:52 │ 10 │ 600 B │ 10.41s │ +╰─────────────┴─────────────┴─────────────────────┴──────────┴───────┴──────────────╯ +``` + +We need one more JetStream construct before we can start consuming our messages: a *consumer*: + +```csharp +var consumer = await js.CreateConsumerAsync(stream: "shop_orders", consumer: "order_processor"); +``` + +In JetStream, consumers are stored on the server. Clients don't need to worry about maintaining state separately. +You can think of JetStream consumers as pointers to messages in streams stored on the NATS JetStream server. Let's +see what our consumer's state is: + +```shell +$ nats consumer report shop_orders +╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ Consumer report for shop_orders with 1 consumers │ +├─────────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤ +│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ +├─────────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤ +│ order_processor │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 10 / 100% │ 0 │ │ +╰─────────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯ +``` + +Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for more information on streams and consumers. + +Finally, we're ready to consume the messages we persisted in `shop_orders` stream: + +```csharp +await foreach (var msg in consumer.ConsumeAllAsync()) +{ + var order = msg.Data; + Console.WriteLine($"Processing {msg.Subject} {order}..."); + await msg.AckAsync(); + // this loop never ends unless there is an error +} +``` + +## Logging + +You should also hook your logger to `NatsConnection` to make sure all is working as expected or +to get help diagnosing any issues you might have: + +```csharp +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +await using var nats = new NatsConnection(otps); +``` + +## What's Next + +[Managing JetStream](manage.md) covers how to create, update, get, list and delete streams and consumers. + +[Publishing messages to streams](publish.md) is achieved by simply publishing to a subject where a stream is configured +to be interested in that subject. + +[Consuming messages from streams](consume.md) explains different ways of retrieving persisted messages. diff --git a/docs/documentation/jetstream/manage.md b/docs/documentation/jetstream/manage.md new file mode 100644 index 000000000..82d94d2be --- /dev/null +++ b/docs/documentation/jetstream/manage.md @@ -0,0 +1,57 @@ +# Managing JetStream and the JetStream Context + +_JetStream Context_ is a NATS JetStream Client concept which is mainly responsible for managing streams. It serves as +the entry point for creating, configuring, and controlling the streams. JetStream Context also exposes methods to +manage consumers directly, bypassing the need to get or create a stream first. + +You can create a context using an existing NATS connection: + +```csharp +await using var nats = new NatsConnection(); + +var js = new NatsJSContext(nats); +``` + +## Streams + +Streams are _message stores_, each stream defines how messages are stored and what the limits (duration, size, interest) +of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in +the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better +to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was +successfully stored. + +An example of creating a stream: + +```csharp +await js.CreateStreamAsync("orders", subjects: new []{"orders.>"}); +``` + +However, in practice streams are usually managed separately from the applications, for example using the [NATS command +line client](https://github.com/nats-io/natscli) you can create a stream interactively: + +```shell +$ nats stream create my_events --subjects 'events.*' +? Storage [Use arrows to move, type to filter, ? for more help] +> file + memory +# you can safely choose defaults for testing and development +``` + +Refer to [NATS JetStream documentation](https://docs.nats.io/nats-concepts/jetstream#functionalities-enabled-by-jetstream) +for stream concepts and more information. + +## Consumers + +A [consumer](https://docs.nats.io/nats-concepts/jetstream/consumers) is a stateful view of a stream. It acts as +interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were +delivered and acknowledged by clients. + +Unlike streams, consumers are accessed by NATS client libraries as part of message consumption: + +```csharp +// Create or get a consumer +var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_processor"); + +// Get an existing consumer +var consumer = await js.GetConsumerAsync(stream: "orders", consumer: "order_processor"); +``` diff --git a/docs/documentation/jetstream/publish.md b/docs/documentation/jetstream/publish.md new file mode 100644 index 000000000..05028e65c --- /dev/null +++ b/docs/documentation/jetstream/publish.md @@ -0,0 +1,50 @@ +# Publishing Messages to Streams + +If you want to persist your messages you can do a normal publish to the subject for unacknowledged delivery, though +it's better to use the JetStream context publish calls instead as the JetStream server will reply with an acknowledgement +that it was successfully stored. + +The subject must be configured on a stream to be persisted: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); + +await js.CreateStreamAsync("orders", subjects: new []{"orders.>"}); +``` + +or using the nats cli: + +```shell +$ nats stream create orders --subjects 'orders.>' +``` + +Then you can publish to subjects captured by the stream: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); + +var order = new Order { OrderId = 1 }; + +var ack = await js.PublishAsync("orders.new.1", order); + +ack.EnsureSuccess(); + +public record Order(int OrderId); +``` + +## Message Deduplication + +JetStream support +[idempotent message writes](https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication) +by ignoring duplicate messages as indicated by the message ID. Message ID is not pert of the message but rather passed +as meta data, part of the message headers. + +```csharp +var ack = await js.PublishAsync("orders.new.1", order, msgId: "1"); +if (ack.Duplicate) +{ + // A message with the same ID was published before +} +``` diff --git a/docs/documentation/toc.yml b/docs/documentation/toc.yml index 8871b7202..dbf3026db 100644 --- a/docs/documentation/toc.yml +++ b/docs/documentation/toc.yml @@ -1,14 +1,25 @@ - name: Introduction href: intro.md -- name: Publish-Subscribe - href: pub-sub.md +- name: Core + href: core/intro.md + items: + - name: Publish-Subscribe + href: core/pub-sub.md + - name: Request-Reply + href: core/req-rep.md + - name: Queue Groups + href: core/queue.md -- name: Request-Reply - href: req-rep.md - -- name: Queue Groups - href: queue.md +- name: JetStream + href: jetstream/intro.md + items: + - name: Managing + href: jetstream/manage.md + - name: Publishing + href: jetstream/publish.md + - name: Consuming + href: jetstream/consume.md - name: Updating Documentation href: update-docs.md diff --git a/docs/index.md b/docs/index.md index f51567bfa..a59e71793 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,70 +1,22 @@ -# NATS.NET V2 (Preview) +# NATS.NET V2 -The NATS.NET V2 client is in preview and not recommended for production use. -Codebase is still under heavy development and currently we only have implementations for [core NATS](https://docs.nats.io/nats-concepts/core-nats) features. +NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://dot.net/). -Please test and provide feedback by visiting our [Slack channel](https://natsio.slack.com/channels/dotnet). +## Preview -## NATS.NET V2 Goals +The NATS.NET V2 client is in preview and not recommended for production use yet. -- Only support Async I/O -- Target latest .NET LTS Release (currently `net6.0`) +## Roadmap -## Packages +- [x] Core NATS +- [x] JetStream initial support +- [ ] KV initial support +- [ ] Object Store initial support +- [ ] .NET 8.0 support (e.g. Native AOT) +- [ ] Beta phase -- **NATS.Client.Core**: [core NATS](https://docs.nats.io/nats-concepts/core-nats) -- **NATS.Client.Hosting**: extension to configure DI container -- **NATS.Client.JetStream**: JetStream *not yet implemented* +## What's Next -## Basic Usage +[Documentation](documentation/intro.md) can help you start writing code in no time. Just follow our quick start guides. -[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen -on its default TCP port 4222. - -Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending -and receiving `Bar` objects: - -```csharp -public record Bar -{ - public int Id { get; set; } - public string Name { get; set; } -} -``` - -Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -await using sub = await nats.SubscribeAsync("bar.>"); -await foreach (var msg in sub.Msgs.ReadAllAsync()) -{ - Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); -} -``` - -Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -for (int i = 0; i < 10; i++) -{ - Console.WriteLine($" Publishing {i}..."); - await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); -} -``` - -You should also hook your logger to `NatsConnection` to make sure all is working as expected or -to get help diagnosing any issues you might have: -```csharp -var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; -await using var nats = new NatsConnection(options); -``` - -## Contributing - -- Run `dotnet format` at root directory of project in order to clear warnings that can be auto-formatted - -## Attribution - -This library is based on the excellent work in [Cysharp/AlterNats](https://github.com/Cysharp/AlterNats) +[API](api/index.md) is the generated reference documentation. diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index cd4ad43bf..561e987eb 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -51,6 +51,11 @@ public async ValueTask SubscribeAsync(string subject, string? queueGroup, NatsSu { if (IsInboxSubject(subject)) { + if (queueGroup != null) + { + throw new NatsException("Inbox subscriptions don't support queue groups"); + } + await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); } else diff --git a/src/NATS.Client.JetStream/INatsJSConsume.cs b/src/NATS.Client.JetStream/INatsJSConsume.cs index 4ab2f7135..bae608922 100644 --- a/src/NATS.Client.JetStream/INatsJSConsume.cs +++ b/src/NATS.Client.JetStream/INatsJSConsume.cs @@ -2,11 +2,17 @@ namespace NATS.Client.JetStream; +/// +/// Interface to manage a consume() operation on a consumer. +/// public interface INatsJSConsume : IAsyncDisposable { void Stop(); } +/// +/// Interface to extract messages from a consume() operation on a consumer. +/// public interface INatsJSConsume : INatsJSConsume { ChannelReader> Msgs { get; } diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs index 29b23eddf..e835678e2 100644 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ b/src/NATS.Client.JetStream/INatsJSFetch.cs @@ -2,11 +2,17 @@ namespace NATS.Client.JetStream; +/// +/// Interface to manage a fetch() operation on a consumer. +/// public interface INatsJSFetch : IAsyncDisposable { void Stop(); } +/// +/// Interface to extract messages from a fetch() operation on a consumer. +/// public interface INatsJSFetch : INatsJSFetch { ChannelReader> Msgs { get; } diff --git a/src/NATS.Client.JetStream/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs similarity index 98% rename from src/NATS.Client.JetStream/NatsJSConsume.cs rename to src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 2cc560eae..1a9a59e0d 100644 --- a/src/NATS.Client.JetStream/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -4,12 +4,11 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; -namespace NATS.Client.JetStream; +namespace NATS.Client.JetStream.Internal; -public class NatsJSConsume : NatsSubBase, INatsJSConsume +internal class NatsJSConsume : NatsSubBase, INatsJSConsume { private readonly ILogger _logger; private readonly bool _debug; diff --git a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs index 60d786a13..1673d95ee 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs @@ -9,5 +9,5 @@ public static class NatsJSExtensionsInternal public static bool HasTerminalJSError(this NatsHeaders headers) => headers is { Code: 400 } or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted } - or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted }; + or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased }; } diff --git a/src/NATS.Client.JetStream/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs similarity index 98% rename from src/NATS.Client.JetStream/NatsJSFetch.cs rename to src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index fe2c0d750..c04049b15 100644 --- a/src/NATS.Client.JetStream/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -4,12 +4,11 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; -namespace NATS.Client.JetStream; +namespace NATS.Client.JetStream.Internal; -public class NatsJSFetch : NatsSubBase, INatsJSFetch +internal class NatsJSFetch : NatsSubBase, INatsJSFetch { private readonly ILogger _logger; private readonly bool _debug; diff --git a/src/NATS.Client.JetStream/NatsJSResponse.cs b/src/NATS.Client.JetStream/Internal/NatsJSResponse.cs similarity index 71% rename from src/NATS.Client.JetStream/NatsJSResponse.cs rename to src/NATS.Client.JetStream/Internal/NatsJSResponse.cs index 1237e42c8..46fe44b61 100644 --- a/src/NATS.Client.JetStream/NatsJSResponse.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSResponse.cs @@ -1,12 +1,12 @@ using NATS.Client.JetStream.Models; -namespace NATS.Client.JetStream; +namespace NATS.Client.JetStream.Internal; /// /// JetStream response including an optional error property encapsulating both successful and failed calls. /// /// JetStream response type -public readonly struct NatsJSResponse +internal readonly struct NatsJSResponse { internal NatsJSResponse(T? response, ApiError? error) { @@ -28,12 +28,3 @@ public void EnsureSuccess() } } } - -public class NatsJSApiException : NatsJSException -{ - public NatsJSApiException(ApiError error) - : base(error.Description) => - Error = error; - - public ApiError Error { get; } -} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index e89d15b8f..86d8a357c 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -6,6 +6,9 @@ namespace NATS.Client.JetStream; +/// +/// Represents a NATS JetStream consumer. +/// public class NatsJSConsumer { private readonly NatsJSContext _context; @@ -13,7 +16,7 @@ public class NatsJSConsumer private readonly string _consumer; private volatile bool _deleted; - public NatsJSConsumer(NatsJSContext context, ConsumerInfo info) + internal NatsJSConsumer(NatsJSContext context, ConsumerInfo info) { _context = context; Info = info; @@ -21,14 +24,35 @@ public NatsJSConsumer(NatsJSContext context, ConsumerInfo info) _consumer = Info.Name; } + /// + /// Consumer info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed. + /// public ConsumerInfo Info { get; private set; } + /// + /// Delete this consumer. + /// + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// After deletion this object can't be used anymore. + /// Server responded with an error. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken); } + /// + /// Starts an enumerator consuming messages from the stream using this consumer. + /// + /// Consume options. (default: MaxMsgs 1,000) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. public async IAsyncEnumerable> ConsumeAllAsync( NatsJSConsumeOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -41,10 +65,18 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d } } + /// + /// Starts consuming messages from the stream using this consumer. + /// + /// Consume options. (default: MaxMsgs 1,000) + /// A used to cancel the call. + /// Message type to deserialize. + /// A consume object to manage the operation and retrieve messages. + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); - opts ??= _context.Opts.DefaultConsumeOpts; var inbox = _context.NewInbox(); @@ -101,6 +133,36 @@ await sub.CallMsgNextAsync( return sub; } + /// + /// Consume a single message from the stream using this consumer. + /// + /// Next message options. (default: 30 seconds timeout) + /// A used to cancel the call. + /// Message type to deserialize. + /// Message retrieved from the stream or NULL + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. + /// + /// + /// If the request to server expires (in 30 seconds by default) this call returns NULL. + /// + /// + /// This method is implemented as a fetch with MaxMsgs=1 which means every request will create a new subscription + /// on the NATS server. This would be inefficient if you're consuming a lot of messages and you should consider using + /// fetch or consume methods. + /// + /// + /// + /// The following example shows how you might process messages: + /// + /// var next = await consumer.NextAsync<Data>(); + /// if (next is { } msg) + /// { + /// // process the message + /// await msg.AckAsync(); + /// } + /// + /// public async ValueTask?> NextAsync(NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); @@ -124,6 +186,15 @@ await sub.CallMsgNextAsync( return default; } + /// + /// Consume a set number of messages from the stream using this consumer. + /// + /// Fetch options. (default: MaxMsgs 1,000 and timeout in 30 seconds) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async IAsyncEnumerable> FetchAllAsync( NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -138,6 +209,15 @@ await sub.CallMsgNextAsync( } } + /// + /// Consume a set number of messages from the stream using this consumer. + /// + /// Fetch options. (default: MaxMsgs 1,000 and timeout in 30 seconds) + /// A used to cancel the call. + /// Message type to deserialize. + /// A fetch object to manage the operation and retrieve messages. + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async ValueTask> FetchAsync( NatsJSFetchOpts? opts = default, CancellationToken cancellationToken = default) @@ -197,6 +277,12 @@ await sub.CallMsgNextAsync( return sub; } + /// + /// Retrieve the consumer info from the server and update this consumer. + /// + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => Info = await _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}", diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 3b7bd8019..389bb5287 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -5,6 +5,16 @@ namespace NATS.Client.JetStream; public partial class NatsJSContext { + /// + /// Creates new consumer if it doesn't exists or returns an existing one with the same name. + /// + /// Stream name to create the consumer under. + /// Name of the consumer. + /// Ack policy to use. Must not be set to none. Default is explicit. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. public ValueTask CreateConsumerAsync( string stream, string consumer, @@ -23,6 +33,14 @@ public ValueTask CreateConsumerAsync( }, cancellationToken); + /// + /// Creates new consumer if it doesn't exists or returns an existing one with the same name. + /// + /// Consumer creation request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. public async ValueTask CreateConsumerAsync( ConsumerCreateRequest request, CancellationToken cancellationToken = default) @@ -46,6 +64,15 @@ public async ValueTask CreateConsumerAsync( return new NatsJSConsumer(this, response); } + /// + /// Gets consumer information from the server and creates a NATS JetStream consumer . + /// + /// Stream name where consumer is associated to. + /// Consumer name. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) { var response = await JSRequestResponseAsync( @@ -55,6 +82,17 @@ public async ValueTask GetConsumerAsync(string stream, string co return new NatsJSConsumer(this, response); } + /// + /// Enumerates through consumers belonging to a stream. + /// + /// Stream name the consumers belong to. + /// A used to cancel the API call. + /// Async enumerable of consumer objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. + /// public async IAsyncEnumerable ListConsumersAsync( string stream, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -67,6 +105,15 @@ public async IAsyncEnumerable ListConsumersAsync( yield return new NatsJSConsumer(this, consumer); } + /// + /// Delete a consumer from a stream. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be deleted. + /// A used to cancel the API call. + /// Whether the deletion was successful. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) { var response = await JSRequestResponseAsync( diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index dc8ae6a8b..3bd5d8a8a 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -5,9 +5,26 @@ namespace NATS.Client.JetStream; public partial class NatsJSContext { + /// + /// Creates a new stream if it doesn't exist or returns an existing stream with the same name. + /// + /// Name of the stream to create. (e.g. my_events) + /// List of subjects stream will persist messages from. (e.g. events.*) + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public ValueTask CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default) => CreateStreamAsync(new StreamCreateRequest { Name = stream, Subjects = subjects }, cancellationToken); + /// + /// Creates a new stream if it doesn't exist or returns an existing stream with the same name. + /// + /// Stream configuration request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask CreateStreamAsync( StreamConfiguration request, CancellationToken cancellationToken = default) @@ -19,6 +36,14 @@ public async ValueTask CreateStreamAsync( return new NatsJSStream(this, response); } + /// + /// Deletes a stream. + /// + /// Stream name to be deleted. + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask DeleteStreamAsync( string stream, CancellationToken cancellationToken = default) @@ -30,6 +55,14 @@ public async ValueTask DeleteStreamAsync( return response.Success; } + /// + /// Get stream information from the server and creates a NATS JetStream stream object . + /// + /// Name of the stream to retrieve. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask GetStreamAsync( string stream, CancellationToken cancellationToken = default) @@ -41,6 +74,14 @@ public async ValueTask GetStreamAsync( return new NatsJSStream(this, response); } + /// + /// Update a NATS JetStream stream's properties. + /// + /// Stream update request object to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The updated NATS JetStream stream object. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask UpdateStreamAsync( StreamUpdateRequest request, CancellationToken cancellationToken = default) @@ -52,6 +93,17 @@ public async ValueTask UpdateStreamAsync( return new NatsJSStream(this, response); } + /// + /// Enumerates through the streams exists on the NATS JetStream server. + /// + /// Limit the list to streams matching this subject filter. + /// A used to cancel the API call. + /// Async enumerable of stream objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of streams if there are a lot of them. + /// public async IAsyncEnumerable ListStreamsAsync( string? subject = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index e1a846edb..d9f359161 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -5,13 +5,20 @@ namespace NATS.Client.JetStream; +/// Provides management and access to NATS JetStream streams and consumers. public partial class NatsJSContext { + /// > public NatsJSContext(NatsConnection connection) : this(connection, new NatsJSOpts(connection.Opts)) { } + /// + /// Creates a NATS JetStream context used to manage and access streams and consumers. + /// + /// A NATS server connection to access the JetStream APIs, publishers and consumers. + /// Context wide JetStream options. public NatsJSContext(NatsConnection connection, NatsJSOpts opts) { Connection = connection; @@ -22,19 +29,58 @@ public NatsJSContext(NatsConnection connection, NatsJSOpts opts) internal NatsJSOpts Opts { get; } + /// + /// Calls JetStream Account Info API. + /// + /// A used to cancel the API call. + /// The account information based on the NATS connection credentials. public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => JSRequestResponseAsync( subject: $"{Opts.Prefix}.INFO", request: null, cancellationToken); + /// + /// Sends data to a stream associated with the subject. + /// + /// Subject to publish the data to. + /// Data to publish. + /// Sets Nats-Msg-Id header for idempotent message writes. + /// Optional message headers. + /// Options to be used by publishing command. + /// A used to cancel the publishing call or the wait for response. + /// Type of the data being sent. + /// + /// The ACK response to indicate if stream accepted the message as well as additional + /// information like the sequence number of the message stored by the stream. + /// + /// There was a problem receiving the response. + /// + /// + /// Note that if the subject isn't backed by a stream or the connected NATS server + /// isn't running with JetStream enabled, this call will hang waiting for an ACK + /// until the request times out. + /// + /// + /// By setting msgId you can ensure messages written to a stream only once. JetStream support idempotent + /// message writes by ignoring duplicate messages as indicated by the Nats-Msg-Id header. If both msgId + /// and the Nats-Msg-Id header value was set, msgId parameter value will be used. + /// + /// public async ValueTask PublishAsync( string subject, T? data, + string? msgId = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { + if (msgId != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Msg-Id"] = msgId; + } + await using var sub = await Connection.RequestSubAsync( subject: subject, data: data, @@ -124,12 +170,3 @@ internal async ValueTask> JSRequestAsync - Sequence = sequence; - - public long Sequence { get; } -} diff --git a/src/NATS.Client.JetStream/NatsJSException.cs b/src/NATS.Client.JetStream/NatsJSException.cs index 83e0a3f32..24b8cdb3c 100644 --- a/src/NATS.Client.JetStream/NatsJSException.cs +++ b/src/NATS.Client.JetStream/NatsJSException.cs @@ -1,29 +1,92 @@ using NATS.Client.Core; +using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; +/// +/// Generic JetStream exception. +/// public class NatsJSException : NatsException { + /// + /// Create JetStream generic exception. + /// + /// Error message. public NatsJSException(string message) : base(message) { } + /// + /// Create JetStream generic exception. + /// + /// Error message. + /// Inner exception. public NatsJSException(string message, Exception exception) : base(message, exception) { } } +/// +/// JetStream protocol errors received during message consumption. +/// public class NatsJSProtocolException : NatsJSException { + /// + /// Create JetStream protocol exception. + /// + /// Error message. public NatsJSProtocolException(string message) : base(message) { } + /// + /// Create JetStream protocol exception. + /// + /// Error message. + /// Inner exception. public NatsJSProtocolException(string message, Exception exception) : base(message, exception) { } } + +/// +/// The exception that is thrown when JetStream publish acknowledgment indicates a duplicate sequence error. +/// +public class NatsJSDuplicateMessageException : NatsJSException +{ + /// + /// Create JetStream duplicate message exception. + /// + /// The duplicate sequence number. + public NatsJSDuplicateMessageException(long sequence) + : base($"Duplicate of {sequence}") => + Sequence = sequence; + + /// + /// The duplicate sequence number. + /// + public long Sequence { get; } +} + +/// +/// JetStream API call errors. +/// +public class NatsJSApiException : NatsJSException +{ + /// + /// Create JetStream API exception. + /// + /// Error response received from the server. + public NatsJSApiException(ApiError error) + : base(error.Description) => + Error = error; + + /// + /// API error response received from the server. + /// + public ApiError Error { get; } +} diff --git a/src/NATS.Client.JetStream/NatsJSExtensions.cs b/src/NATS.Client.JetStream/NatsJSExtensions.cs index 8fcea7484..1d35a0403 100644 --- a/src/NATS.Client.JetStream/NatsJSExtensions.cs +++ b/src/NATS.Client.JetStream/NatsJSExtensions.cs @@ -4,6 +4,13 @@ namespace NATS.Client.JetStream; public static class NatsJSExtensions { + /// + /// Make sure acknowledgment was successful and throw an exception otherwise. + /// + /// ACK response. + /// is NULL. + /// Server responded with an error. + /// A message with the same Nats-Msg-Id was received before. public static void EnsureSuccess(this PubAckResponse ack) { if (ack == null) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index babd95d11..c0a4dfc54 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -19,22 +19,80 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) _context = context; } + /// + /// Subject of the user message. + /// public string Subject => _msg.Subject; + /// + /// Message size in bytes. + /// + /// + /// Message size is calculated using the same method NATS server uses: + /// + /// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; + /// + /// public int Size => _msg.Size; + /// + /// Headers of the user message if set. + /// public NatsHeaders? Headers => _msg.Headers; + /// + /// Deserialized user data. + /// public T? Data => _msg.Data; + /// + /// The connection messages was delivered on. + /// public INatsConnection? Connection => _msg.Connection; + /// + /// Acknowledges the message was completely handled. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); + /// + /// Signals that the message will not be processed now and processing can move onto the next message. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + /// + /// Messages rejected using NACK will be resent by the NATS JetStream server after the configured timeout. + /// public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + /// + /// Indicates that work is ongoing and the wait period should be extended. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + /// + /// + /// Time period is defined by the consumer's ack_wait configuration on the server which is + /// defined as how long to allow messages to remain un-acknowledged before attempting redelivery. + /// + /// + /// This message must be sent before the ack_wait period elapses. The period should be extended + /// by another amount of time equal to ack_wait by the NATS JetStream server. + /// + /// public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + /// + /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.JetStream/NatsJSNotification.cs b/src/NATS.Client.JetStream/NatsJSNotification.cs deleted file mode 100644 index 34232e218..000000000 --- a/src/NATS.Client.JetStream/NatsJSNotification.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace NATS.Client.JetStream; - -public record NatsJSNotification(int Code, string Description) -{ - public static readonly NatsJSNotification HeartbeatTimeout = new NatsJSNotification(1001, "Heartbeat Timeout"); -} diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index af87b587d..c2bd7d6b4 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -2,6 +2,9 @@ namespace NATS.Client.JetStream; +/// +/// JetStream options to be used within a JetStream context. +/// public record NatsJSOpts { public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = default, AckOpts? ackOpts = default) @@ -56,6 +59,9 @@ public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = d public NatsJSNextOpts DefaultNextOpts { get; init; } = new(); } +/// +/// Consumer consume method options. +/// public record NatsJSConsumeOpts { /// @@ -98,6 +104,9 @@ public record NatsJSConsumeOpts public INatsSerializer? Serializer { get; init; } } +/// +/// Consumer next method options. +/// public record NatsJSNextOpts { /// @@ -120,6 +129,9 @@ public record NatsJSNextOpts public INatsSerializer? Serializer { get; init; } } +/// +/// Consumer fetch method options. +/// public record NatsJSFetchOpts { /// diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 970f63b62..0de05d852 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -1,29 +1,49 @@ -using System.Runtime.CompilerServices; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; +/// +/// Represents a NATS JetStream stream. +/// public class NatsJSStream { private readonly NatsJSContext _context; private readonly string _name; private bool _deleted; - public NatsJSStream(NatsJSContext context, StreamInfo info) + internal NatsJSStream(NatsJSContext context, StreamInfo info) { _context = context; Info = info; _name = info.Config.Name; } + /// + /// Stream info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed. + /// public StreamInfo Info { get; private set; } + /// + /// Delete this stream. + /// + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. + /// After deletion this object can't be used anymore. public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _deleted = await _context.DeleteStreamAsync(_name, cancellationToken); } + /// + /// Update stream properties on the server. + /// + /// Stream update request to be sent to the server. + /// A used to cancel the API call. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. public async ValueTask UpdateAsync( StreamUpdateRequest request, CancellationToken cancellationToken = default) @@ -33,30 +53,91 @@ public async ValueTask UpdateAsync( Info = response.Info; } - public IAsyncEnumerable ListConsumersAsync(CancellationToken cancellationToken = default) + /// + /// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name. + /// + /// Name of the consumer. + /// Ack policy to use. Must not be set to none. Default is explicit. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. + public ValueTask CreateConsumerAsync(string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, CancellationToken cancellationToken = default) { ThrowIfDeleted(); - return _context.ListConsumersAsync(_name, cancellationToken); + return _context.CreateConsumerAsync(_name, consumer, ackPolicy, cancellationToken); } + /// + /// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name. + /// + /// Consumer creation request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. public ValueTask CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.CreateConsumerAsync(request, cancellationToken); } + /// + /// Gets consumer information from the server and creates a NATS JetStream consumer . + /// + /// Consumer name. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. public ValueTask GetConsumerAsync(string consumer, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.GetConsumerAsync(_name, consumer, cancellationToken); } + /// + /// Enumerates through consumers belonging to this stream. + /// + /// A used to cancel the API call. + /// Async enumerable of consumer objects. Can be used in a await foreach loop. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. + /// + public IAsyncEnumerable ListConsumersAsync(CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _context.ListConsumersAsync(_name, cancellationToken); + } + + /// + /// Delete a consumer from this stream. + /// + /// Consumer name to be deleted. + /// A used to cancel the API call. + /// Whether the deletion was successful. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. public ValueTask DeleteConsumerAsync(string consumer, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.DeleteConsumerAsync(_name, consumer, cancellationToken); } + /// + /// Retrieve the stream info from the server and update this stream. + /// + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. + public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => + Info = await _context.JSRequestResponseAsync( + subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_name}", + request: null, + cancellationToken).ConfigureAwait(false); + private void ThrowIfDeleted() { if (_deleted) diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index efadfb7f2..0dec6922d 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -96,7 +96,7 @@ public async Task Consume_idle_heartbeat_test() var signal = new WaitSignal(TimeSpan.FromSeconds(30)); server.OnLog += log => { - if (log is { Category: "NATS.Client.JetStream.NatsJSConsume", LogLevel: LogLevel.Debug }) + if (log is { Category: "NATS.Client.JetStream.Internal.NatsJSConsume", LogLevel: LogLevel.Debug }) { if (log.EventId == NatsJSLogEvents.IdleTimeout) signal.Pulse();