Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream documentation and minor fixes #129

Merged
merged 11 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 20 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,67 +1,35 @@
# NATS.NET V2

NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://dot.net/).

## Preview

The NATS.NET V2 client is in preview and not recommended for production use.
Codebase is still under heavy development and currently we only have implementations for [core NATS](https://docs.nats.io/nats-concepts/core-nats) features.
The NATS.NET V2 client is in preview and not recommended for production use yet.
Codebase is still under heavy development and we currently implemented [Core NATS](https://docs.nats.io/nats-concepts/core-nats)
and basic [JetStream](https://docs.nats.io/nats-concepts/jetstream) features.

Please test and provide feedback:

* on [slack.nats.io dotnet channel](https://natsio.slack.com/channels/dotnet)
* or use GitHub discussions, issues and PRs

Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help
supporting and developing NATS .NET V2 project.

## Documentation

Please test and provide feedback by visiting our [Slack channel](https://natsio.slack.com/channels/dotnet).
Check out the [documentation](https://nats-io.github.io/nats.net.v2/) for guides and examples.

## NATS.NET V2 Goals

- Only support Async I/O
- Target latest .NET LTS Release (currently `net6.0`)
- Only support Async I/O (async/await)
- Target latest .NET LTS Release (currently .NET 6.0)

## Packages

- **NATS.Client.Core**: [core NATS](https://docs.nats.io/nats-concepts/core-nats)
- **NATS.Client.Core**: [Core NATS](https://docs.nats.io/nats-concepts/core-nats)
- **NATS.Client.Hosting**: extension to configure DI container
- **NATS.Client.JetStream**: JetStream *not yet implemented*

## Basic Usage

[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen
on its default TCP port 4222.

Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending
and receiving `Bar` objects:

```csharp
public record Bar
{
public int Id { get; set; }
public string Name { get; set; }
}
```

Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects):
```csharp
await using var nats = new NatsConnection(options);

await using sub = await nats.SubscribeAsync<Bar>("bar.>");
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
}
```

Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects):
```csharp
await using var nats = new NatsConnection();

for (int i = 0; i < 10; i++)
{
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<Bar>($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" });
}
```

You should also hook your logger to `NatsConnection` to make sure all is working as expected or
to get help diagnosing any issues you might have:
```csharp
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
await using var nats = new NatsConnection(options);
```
- **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream)

## Contributing

Expand Down
72 changes: 72 additions & 0 deletions docs/documentation/core/intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Core NATS

[Core NATS](https://docs.nats.io/nats-concepts/core-nats) is the base set of functionalities and qualities of service
offered by a NATS service infrastructure. Core NATS is the foundation for JetStream and other services. For the sake
of explanation, in a simplified sense you can think of Core NATS as the
[wire protocol](https://docs.nats.io/reference/reference-protocols/nats-protocol) defining a simple but powerful
pub/sub functionality and the concept of [Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects).

## Core NATS Quick Start

[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen
on its default TCP port 4222.

```shell
$ nats-server
```

Install `NATS.Client.Core` preview from Nuget.

Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending
and receiving `Bar` objects:

```csharp
public record Bar
{
public int Id { get; set; }
public string Name { get; set; }
}
```

Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects):

```csharp
await using var nats = new NatsConnection();

await using sub = await nats.SubscribeAsync<Bar>("bar.>");
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
}
```

Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects):
```csharp
await using var nats = new NatsConnection();

for (int i = 0; i < 10; i++)
{
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<Bar>($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" });
}
```

## Logging

You should also hook your logger to `NatsConnection` to make sure all is working as expected or
to get help diagnosing any issues you might have:

```csharp
var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
await using var nats = new NatsConnection(otps);
```

## What's Next

[Publish-Subscribe](pub-sub.md) is the message distribution model for one-to-many communication.

[Request-Reply](req-rep.md) is a common pattern in modern distributed systems. A request is sent, and the application
either waits on the response with a certain timeout, or receives a response asynchronously.

[Queue Groups](queue.md) enables the 1:N fan-out pattern of messaging ensuring that any message sent by a publisher,
reaches all subscribers that have registered.
File renamed without changes.
File renamed without changes.
File renamed without changes.
10 changes: 8 additions & 2 deletions docs/documentation/intro.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# NATS.NET V2 Client

NATS.NET V2 Client is a .Net client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)!
It's build on top of the modern .Net 6+ platform, taking advantage of all the high performance features and
NATS.NET V2 Client is a .NET client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)!
It's build on top of the modern .NET platform, taking advantage of all the high performance features and
asynchronous programming model.

NATS.NET V2 Client, just like NATS, is Open Source as is this documentation.
Please [let us know](https://natsio.slack.com/channels/dotnet) if you have updates and/or suggestions for
these docs. You can also create a Pull Request using the Edit on GitHub link on each page.

## What's Next

[Core NATS](core/intro.md) is the base set of functionalities and qualities of service offered by a NATS service infrastructure.

[JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary.
108 changes: 108 additions & 0 deletions docs/documentation/jetstream/consume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Consuming Messages from Streams

Consuming messages from a stream can be done using one of three different methods depending on you application needs.
You can access these methods from the consumer object you can create using JetStream context:

```csharp
await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_processor");
```

## Next Method

Next method is the simplest was of retrieving messages from a stream. Every time you call the next method you get
a single message or nothing based on the expiry time to wait for a message. Once a message is received you can
process it and call next again for another.

```csharp
while (!cancellationToken.IsCancellationRequested)
{
var next = await consumer.NextAsync<Order>();

if (next is { } msg)
{
Console.WriteLine($"Processing {msg.Subject}: {msg.Data.OrderId}...");
await msg.AckAsync();
}
}
```

Next is the simplest and most conservative way of consuming messages since you request a single message from JetStream
server then acknowledge it before requesting more messages. However, next method is also the least performant since
there is not message batching.

## Fetch Method

Fetch method requests messages in batches to improve the performance while giving the application the control over how
fast it can process the messages without overwhelming the application process.

```csharp
while (!cancellationToken.IsCancellationRequested)
{
// Consume a batch of messages (1000 by default)
await foreach (var msg in consumer.FetchAllAsync<Order>())
{
// Process message
await msg.AckAsync();
}
}
```

## Consume Method

Consume method is the most performant method of consuming messages. Request for messages (a.k.a. pull requests) are
interleaved so that there is a constant flow of messages from the JetStream server. Flow is controlled by `MaxMsgs`
or `MaxBytes` and respective thresholds not to overwhelm the application and not to waste server resources.

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
{
// Process message
await msg.AckAsync();

// loop never exits unless there is an error or a break
}
```

## Handling Exceptions

While consuming messages (using next, fetch or consume methods) there are several scenarios where exception might be
thrown by the client library, for example:

* Consumer is deleted by another application or operator
* Connection to NATS server is interrupted (mainly for next and fetch methods, consume method can recover)
* Client request for the next batch is invalid
* Account permissions have changed
* Cluster leader changed

A naive implementation might try to recover from errors assuming they are temporary e.g. the stream or the consumer
will be created eventually:

```csharp
while (!cancellationToken.IsCancellationRequested)
{
try
{
await consumer.RefreshAsync(); // or try to recreate consumer
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
{
// Process message
await msg.AckAsync();
}
}
catch (NatsJSProtocolException e)
{
// log exception
}
catch (NatsJSException e)
{
// log exception
await Task.Delay(1000); // or back off
}
}
```

Depending on your application you should configure streams and consumers with appropriate settings so that the
messages are processed and stored based on your requirements.
Loading