Skip to content

Commit

Permalink
JetStream release
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Sep 13, 2023
1 parent fd1fd42 commit 6b490ea
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 19 deletions.
28 changes: 15 additions & 13 deletions docs/documentation/jetstream/consume.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Consuming Messages from Streams

Consuming messages from a stream can be done using one of three different methods depending on you application needs.
You can access these methods from the consumer object you can create using JetStream context:
Consuming messages from a stream can be done using one of three different methods depending on your application needs.
You can access these methods from the consumer object created using JetStream context:

```csharp
await using var nats = new NatsConnection();
Expand All @@ -12,7 +12,7 @@ var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_p

## Next Method

Next method is the simplest was of retrieving messages from a stream. Every time you call the next method you get
Next method is the simplest way of retrieving messages from a stream. Every time you call the next method, you get
a single message or nothing based on the expiry time to wait for a message. Once a message is received you can
process it and call next again for another.

Expand All @@ -30,13 +30,13 @@ while (!cancellationToken.IsCancellationRequested)
```

Next is the simplest and most conservative way of consuming messages since you request a single message from JetStream
server then acknowledge it before requesting more messages. However, next method is also the least performant since
there is not message batching.
server then acknowledge it before requesting more. However, next method is also the least performant since
there is no message batching.

## Fetch Method

Fetch method requests messages in batches to improve the performance while giving the application the control over how
fast it can process the messages without overwhelming the application process.
Fetch method requests messages in batches to improve the performance while giving the application control over how
fast it can process messages without overwhelming the application process.

```csharp
while (!cancellationToken.IsCancellationRequested)
Expand All @@ -46,34 +46,36 @@ while (!cancellationToken.IsCancellationRequested)
{
// Process message
await msg.AckAsync();

// Loop ends when pull request expires or when requested number of messages (MaxMsgs) received
}
}
```

## Consume Method

Consume method is the most performant method of consuming messages. Request for messages (a.k.a. pull requests) are
interleaved so that there is a constant flow of messages from the JetStream server. Flow is controlled by `MaxMsgs`
or `MaxBytes` and respective thresholds not to overwhelm the application and not to waste server resources.
Consume method is the most performant method of consuming messages. Requests for messages (a.k.a. pull requests) are
overlapped so that there is a constant flow of messages from the JetStream server. Flow is controlled by `MaxMsgs`
or `MaxBytes` and respective thresholds to not overwhelm the application and to not waste server resources.

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
{
// Process message
await msg.AckAsync();

// loop never exits unless there is an error or a break
// loop never ends unless there is an error or a break
}
```

## Handling Exceptions

While consuming messages (using next, fetch or consume methods) there are several scenarios where exception might be
While consuming messages (using next, fetch or consume methods) there are several scenarios where exceptions might be
thrown by the client library, for example:

* Consumer is deleted by another application or operator
* Connection to NATS server is interrupted (mainly for next and fetch methods, consume method can recover)
* Client request for the next batch is invalid
* Client pull request is invalid
* Account permissions have changed
* Cluster leader changed

Expand Down
6 changes: 3 additions & 3 deletions docs/documentation/jetstream/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ $ nats-server -js

Install `NATS.Client.JetStream` preview from Nuget.

Before we can so anything, we need a JetStream context:
Before we can do anything, we need a JetStream context:

```csharp
await using var nc = new NatsConnection();
var js = new NatsJSContext(nc);
await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);
```

Let's create our stream first. In JetStream, a stream is simply a storage for messages:
Expand Down
4 changes: 2 additions & 2 deletions docs/documentation/jetstream/publish.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public record Order(int OrderId);

JetStream support
[idempotent message writes](https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication)
by ignoring duplicate messages as indicated by the message ID. Message ID is not pert of the message but rather passed
as meta data, part of the message headers.
by ignoring duplicate messages as indicated by the message ID. Message ID is not part of the message but rather passed
as metadata, part of the message headers.

```csharp
var ack = await js.PublishAsync("orders.new.1", order, msgId: "1");
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0-alpha.2
2.0.0-alpha.3

0 comments on commit 6b490ea

Please sign in to comment.