Skip to content

Commit

Permalink
Jetstream preview release 1 (#126)
Browse files Browse the repository at this point in the history
* JetStream preview release

* Fixes to error handler interface
* Stream delete interface

* JetStream list api response exposed

Exposed list APIs respond object to make paging possible.
A paging extension can be implemented separately if needed.

* Renamed Consume and Fetch interfaces

* JetStream API review

* Consume and fetch error handling removes
* Consume and fetch now throws exception on JS terminal errors
* Removed ErrorHandler option
* Async enumerable initial List implementation
* Domain option added to JSOpts
* Logging improvements for testing
* Option validations now throw exception

* Removed inbox prefix from JS Opts

* Also fixed typo in NatsJSOptsDefaults

* Pack JetStream to publish
  • Loading branch information
mtmk authored Sep 7, 2023
1 parent adc21cb commit 295a636
Show file tree
Hide file tree
Showing 27 changed files with 451 additions and 272 deletions.
131 changes: 94 additions & 37 deletions sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
int? maxMsgs = 1000;
int? maxBytes = null;

static void ErrorHandler(NatsJSNotification notification)
{
Console.WriteLine($"Error: {notification}");
}

void Report(int i, Stopwatch sw, string data)
{
Console.WriteLine(data);
Expand All @@ -47,7 +42,6 @@ void Report(int i, Stopwatch sw, string data)
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
ErrorHandler = ErrorHandler,
};

var fetchOpts = new NatsJSFetchOpts
Expand All @@ -57,15 +51,13 @@ void Report(int i, Stopwatch sw, string data)
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
ErrorHandler = ErrorHandler,
};

var nextOpts = new NatsJSNextOpts
{
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
ErrorHandler = ErrorHandler,
};

var stopwatch = Stopwatch.StartNew();
Expand All @@ -77,63 +69,128 @@ void Report(int i, Stopwatch sw, string data)
{
while (!cts.Token.IsCancellationRequested)
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await using var sub = await consumer.FetchAsync<RawData>(fetchOpts, cts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await using var sub = await consumer.FetchAsync<RawData>(fetchOpts, cts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (args.Length > 0 && args[0] == "fetch-all")
{
while (!cts.Token.IsCancellationRequested)
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await foreach (var msg in consumer.FetchAllAsync<RawData>(fetchOpts, cts.Token))
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await foreach (var msg in consumer.FetchAllAsync<RawData>(fetchOpts, cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (args.Length > 0 && args[0] == "next")
{
while (!cts.Token.IsCancellationRequested)
{
Console.WriteLine("___\nNEXT");
var next = await consumer.NextAsync<RawData>(nextOpts, cts.Token);
if (next is { } msg)
try
{
Console.WriteLine("___\nNEXT");
var next = await consumer.NextAsync<RawData>(nextOpts, cts.Token);
if (next is { } msg)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (args.Length > 0 && args[0] == "consume")
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<RawData>(
consumeOpts,
cts.Token);

await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
while (!cts.Token.IsCancellationRequested)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
try
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<RawData>(
consumeOpts,
cts.Token);

await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}

// Console.WriteLine($"took {stopwatch.Elapsed}");
// await nats.PingAsync(cts.Token);
}
else if (args.Length > 0 && args[0] == "consume-all")
{
Console.WriteLine("___\nCONSUME-ALL");
await foreach (var msg in consumer.ConsumeAllAsync<RawData>(consumeOpts, cts.Token))
while (!cts.Token.IsCancellationRequested)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
try
{
Console.WriteLine("___\nCONSUME-ALL");
await foreach (var msg in consumer.ConsumeAllAsync<RawData>(consumeOpts, cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public sealed record NatsOpts
ReconnectJitter: TimeSpan.FromMilliseconds(100),
ConnectTimeout: TimeSpan.FromSeconds(2),
ObjectPoolSize: 256,
RequestTimeout: TimeSpan.FromMinutes(1),
RequestTimeout: TimeSpan.FromSeconds(5),
CommandTimeout: TimeSpan.FromMinutes(1),
SubscriptionCleanUpInterval: TimeSpan.FromMinutes(5),
WriterCommandBufferLimit: 1_000,
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ public enum NatsSubEndReason
MaxBytes,
Timeout,
IdleTimeout,
IdleHeartbeatTimeout,
StartUpTimeout,
Exception,
JetStreamError,
}

public abstract class NatsSubBase
Expand Down
13 changes: 13 additions & 0 deletions src/NATS.Client.JetStream/INatsJSConsume.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading.Channels;

namespace NATS.Client.JetStream;

public interface INatsJSConsume : IAsyncDisposable
{
void Stop();
}

public interface INatsJSConsume<T> : INatsJSConsume
{
ChannelReader<NatsJSMsg<T?>> Msgs { get; }
}
13 changes: 13 additions & 0 deletions src/NATS.Client.JetStream/INatsJSFetch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading.Channels;

namespace NATS.Client.JetStream;

public interface INatsJSFetch : IAsyncDisposable
{
void Stop();
}

public interface INatsJSFetch<T> : INatsJSFetch
{
ChannelReader<NatsJSMsg<T?>> Msgs { get; }
}
13 changes: 0 additions & 13 deletions src/NATS.Client.JetStream/INatsJSSubConsume.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
using NATS.Client.Core;

namespace NATS.Client.JetStream.Internal;

public static class NatsJSExtensionsInternal
{
public static long ToNanos(this TimeSpan timeSpan) => (long)(timeSpan.TotalMilliseconds * 1_000_000);

public static bool HasTerminalJSError(this NatsHeaders headers) => headers
is { Code: 400 }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted };
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@

namespace NATS.Client.JetStream.Internal;

internal static class NatsJSOpsDefaults
internal static class NatsJSOptsDefaults
{
private static readonly TimeSpan ExpiresDefault = TimeSpan.FromSeconds(30);
private static readonly TimeSpan ExpiresMin = TimeSpan.FromSeconds(1);
private static readonly TimeSpan HeartbeatCap = TimeSpan.FromSeconds(30);
private static readonly TimeSpan HeartbeatMin = TimeSpan.FromSeconds(.5);

internal static (long MaxMsgs, long MaxBytes, long ThresholdMsgs, long ThresholdBytes) SetMax(
NatsJSOpts? opts = default,
long? maxMsgs = default,
long? maxBytes = default,
long? thresholdMsgs = default,
long? thresholdBytes = default)
{
var jsOpts = opts ?? new NatsJSOpts(NatsOpts.Default);
long maxMsgsOut;
long maxBytesOut;

Expand All @@ -26,8 +24,7 @@ internal static (long MaxMsgs, long MaxBytes, long ThresholdMsgs, long Threshold
}
else if (!maxMsgs.HasValue && !maxBytes.HasValue)
{
maxMsgsOut = jsOpts.MaxMsgs;
maxBytesOut = 0;
throw new NatsJSException($"You must set {nameof(maxBytes)} or {nameof(maxMsgs)}");
}
else if (maxMsgs.HasValue && !maxBytes.HasValue)
{
Expand All @@ -41,16 +38,22 @@ internal static (long MaxMsgs, long MaxBytes, long ThresholdMsgs, long Threshold
}
else
{
throw new NatsJSException($"Invalid state: {nameof(NatsJSOpsDefaults)}: {nameof(SetMax)}");
throw new NatsJSException($"Invalid state: {nameof(NatsJSOptsDefaults)}: {nameof(SetMax)}");
}

var thresholdMsgsOut = thresholdMsgs ?? maxMsgsOut / 2;

if (thresholdMsgsOut > maxMsgsOut)
thresholdMsgsOut = maxMsgsOut;
{
throw new NatsJSException($"{nameof(thresholdMsgs)} must be less than {nameof(maxMsgs)}");
}

var thresholdBytesOut = thresholdBytes ?? maxBytesOut / 2;

if (thresholdBytesOut > maxBytesOut)
thresholdBytesOut = maxBytesOut;
{
throw new NatsJSException($"{nameof(thresholdBytes)} must be less than {nameof(maxBytes)}");
}

return (maxMsgsOut, maxBytesOut, thresholdMsgsOut, thresholdBytesOut);
}
Expand All @@ -60,14 +63,23 @@ internal static (TimeSpan Expires, TimeSpan IdleHeartbeat) SetTimeouts(
TimeSpan? idleHeartbeat = default)
{
var expiresOut = expires ?? ExpiresDefault;

if (expiresOut < ExpiresMin)
expiresOut = ExpiresMin;
{
throw new NatsJSException($"{nameof(expires)} must be greater than {ExpiresMin}");
}

var idleHeartbeatOut = idleHeartbeat ?? expiresOut / 2;

if (idleHeartbeatOut > HeartbeatCap)
idleHeartbeatOut = HeartbeatCap;
{
throw new NatsJSException($"{nameof(idleHeartbeat)} must be less than {HeartbeatCap}");
}

if (idleHeartbeatOut < HeartbeatMin)
idleHeartbeatOut = HeartbeatMin;
{
throw new NatsJSException($"{nameof(idleHeartbeat)} must be greater than {HeartbeatMin}");
}

return (expiresOut, idleHeartbeatOut);
}
Expand Down
3 changes: 0 additions & 3 deletions src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
<PackageTags>pubsub;messaging</PackageTags>
<Description>JetStream support for NATS.Client.</Description>

<!-- TODO: Let JetStream packages to be created when code is ready for preview -->
<IsPackable>false</IsPackable>

</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 295a636

Please sign in to comment.