Skip to content

Commit

Permalink
JetStream documentation and minor fixes (#129)
Browse files Browse the repository at this point in the history
* Options tidy-up

* Removed variable PubOpts Headers and ReplyTo
* Added headers and replyTo params to Publish methods
* Make all Opts classes (not structs)
* Added default options for JS context

* JetStream docs update

* API docs update

* JetStream exception docs

* JetStream docs structure

* JetStream in-line docs typo fix

* JetStream documentation

* JetStream index and readme updates

* JetStream intro tidy-up

* Fixed test

* Added missing exception docs
  • Loading branch information
mtmk authored Sep 13, 2023
1 parent a91da64 commit fd1fd42
Show file tree
Hide file tree
Showing 30 changed files with 969 additions and 163 deletions.
72 changes: 20 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,67 +1,35 @@
# NATS.NET V2

NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://dot.net/).

## Preview

The NATS.NET V2 client is in preview and not recommended for production use.
Codebase is still under heavy development and currently we only have implementations for [core NATS](https://docs.nats.io/nats-concepts/core-nats) features.
The NATS.NET V2 client is in preview and not recommended for production use yet.
Codebase is still under heavy development and we currently implemented [Core NATS](https://docs.nats.io/nats-concepts/core-nats)
and basic [JetStream](https://docs.nats.io/nats-concepts/jetstream) features.

Please test and provide feedback:

* on [slack.nats.io dotnet channel](https://natsio.slack.com/channels/dotnet)
* or use GitHub discussions, issues and PRs

Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help
supporting and developing NATS .NET V2 project.

## Documentation

Please test and provide feedback by visiting our [Slack channel](https://natsio.slack.com/channels/dotnet).
Check out the [documentation](https://nats-io.github.io/nats.net.v2/) for guides and examples.

## NATS.NET V2 Goals

- Only support Async I/O
- Target latest .NET LTS Release (currently `net6.0`)
- Only support Async I/O (async/await)
- Target latest .NET LTS Release (currently .NET 6.0)

## Packages

- **NATS.Client.Core**: [core NATS](https://docs.nats.io/nats-concepts/core-nats)
- **NATS.Client.Core**: [Core NATS](https://docs.nats.io/nats-concepts/core-nats)
- **NATS.Client.Hosting**: extension to configure DI container
- **NATS.Client.JetStream**: JetStream *not yet implemented*

## Basic Usage

[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen
on its default TCP port 4222.

Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending
and receiving `Bar` objects:

```csharp
public record Bar
{
public int Id { get; set; }
public string Name { get; set; }
}
```

Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects):
```csharp
await using var nats = new NatsConnection(options);

await using sub = await nats.SubscribeAsync<Bar>("bar.>");
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
}
```

Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects):
```csharp
await using var nats = new NatsConnection();

for (int i = 0; i < 10; i++)
{
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<Bar>($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" });
}
```

You should also hook your logger to `NatsConnection` to make sure all is working as expected or
to get help diagnosing any issues you might have:
```csharp
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
await using var nats = new NatsConnection(options);
```
- **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream)

## Contributing

Expand Down
72 changes: 72 additions & 0 deletions docs/documentation/core/intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Core NATS

[Core NATS](https://docs.nats.io/nats-concepts/core-nats) is the base set of functionalities and qualities of service
offered by a NATS service infrastructure. Core NATS is the foundation for JetStream and other services. For the sake
of explanation, in a simplified sense you can think of Core NATS as the
[wire protocol](https://docs.nats.io/reference/reference-protocols/nats-protocol) defining a simple but powerful
pub/sub functionality and the concept of [Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects).

## Core NATS Quick Start

[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen
on its default TCP port 4222.

```shell
$ nats-server
```

Install `NATS.Client.Core` preview from Nuget.

Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending
and receiving `Bar` objects:

```csharp
public record Bar
{
public int Id { get; set; }
public string Name { get; set; }
}
```

Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects):

```csharp
await using var nats = new NatsConnection();

await using sub = await nats.SubscribeAsync<Bar>("bar.>");
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
}
```

Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects):
```csharp
await using var nats = new NatsConnection();

for (int i = 0; i < 10; i++)
{
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<Bar>($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" });
}
```

## Logging

You should also hook your logger to `NatsConnection` to make sure all is working as expected or
to get help diagnosing any issues you might have:

```csharp
var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
await using var nats = new NatsConnection(otps);
```

## What's Next

[Publish-Subscribe](pub-sub.md) is the message distribution model for one-to-many communication.

[Request-Reply](req-rep.md) is a common pattern in modern distributed systems. A request is sent, and the application
either waits on the response with a certain timeout, or receives a response asynchronously.

[Queue Groups](queue.md) enables the 1:N fan-out pattern of messaging ensuring that any message sent by a publisher,
reaches all subscribers that have registered.
File renamed without changes.
File renamed without changes.
File renamed without changes.
10 changes: 8 additions & 2 deletions docs/documentation/intro.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# NATS.NET V2 Client

NATS.NET V2 Client is a .Net client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)!
It's build on top of the modern .Net 6+ platform, taking advantage of all the high performance features and
NATS.NET V2 Client is a .NET client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)!
It's build on top of the modern .NET platform, taking advantage of all the high performance features and
asynchronous programming model.

NATS.NET V2 Client, just like NATS, is Open Source as is this documentation.
Please [let us know](https://natsio.slack.com/channels/dotnet) if you have updates and/or suggestions for
these docs. You can also create a Pull Request using the Edit on GitHub link on each page.

## What's Next

[Core NATS](core/intro.md) is the base set of functionalities and qualities of service offered by a NATS service infrastructure.

[JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary.
108 changes: 108 additions & 0 deletions docs/documentation/jetstream/consume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Consuming Messages from Streams

Consuming messages from a stream can be done using one of three different methods depending on you application needs.
You can access these methods from the consumer object you can create using JetStream context:

```csharp
await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_processor");
```

## Next Method

Next method is the simplest was of retrieving messages from a stream. Every time you call the next method you get
a single message or nothing based on the expiry time to wait for a message. Once a message is received you can
process it and call next again for another.

```csharp
while (!cancellationToken.IsCancellationRequested)
{
var next = await consumer.NextAsync<Order>();

if (next is { } msg)
{
Console.WriteLine($"Processing {msg.Subject}: {msg.Data.OrderId}...");
await msg.AckAsync();
}
}
```

Next is the simplest and most conservative way of consuming messages since you request a single message from JetStream
server then acknowledge it before requesting more messages. However, next method is also the least performant since
there is not message batching.

## Fetch Method

Fetch method requests messages in batches to improve the performance while giving the application the control over how
fast it can process the messages without overwhelming the application process.

```csharp
while (!cancellationToken.IsCancellationRequested)
{
// Consume a batch of messages (1000 by default)
await foreach (var msg in consumer.FetchAllAsync<Order>())
{
// Process message
await msg.AckAsync();
}
}
```

## Consume Method

Consume method is the most performant method of consuming messages. Request for messages (a.k.a. pull requests) are
interleaved so that there is a constant flow of messages from the JetStream server. Flow is controlled by `MaxMsgs`
or `MaxBytes` and respective thresholds not to overwhelm the application and not to waste server resources.

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
{
// Process message
await msg.AckAsync();

// loop never exits unless there is an error or a break
}
```

## Handling Exceptions

While consuming messages (using next, fetch or consume methods) there are several scenarios where exception might be
thrown by the client library, for example:

* Consumer is deleted by another application or operator
* Connection to NATS server is interrupted (mainly for next and fetch methods, consume method can recover)
* Client request for the next batch is invalid
* Account permissions have changed
* Cluster leader changed

A naive implementation might try to recover from errors assuming they are temporary e.g. the stream or the consumer
will be created eventually:

```csharp
while (!cancellationToken.IsCancellationRequested)
{
try
{
await consumer.RefreshAsync(); // or try to recreate consumer
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
{
// Process message
await msg.AckAsync();
}
}
catch (NatsJSProtocolException e)
{
// log exception
}
catch (NatsJSException e)
{
// log exception
await Task.Delay(1000); // or back off
}
}
```

Depending on your application you should configure streams and consumers with appropriate settings so that the
messages are processed and stored based on your requirements.
Loading

0 comments on commit fd1fd42

Please sign in to comment.