Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Key Value Store implementation #132

Merged
merged 36 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
59d275a
Initial Key Value Store implementation
mtmk Sep 21, 2023
9580bea
CI nats-cli download fix
mtmk Sep 21, 2023
c6a1395
Test flappers fixes
mtmk Sep 25, 2023
9272cff
KV direct get and msg get
mtmk Sep 26, 2023
186fb63
Remove consumer create restrictions
mtmk Sep 27, 2023
d18c977
Merge branch 'main' into kv-initial-impl
mtmk Sep 27, 2023
f4a3b8b
Merge branch 'main' into kv-initial-impl
mtmk Sep 30, 2023
8356a52
Fixing test flappers
mtmk Sep 30, 2023
e677669
Fixing test that potentially hangs
mtmk Sep 30, 2023
f58697c
Fixed request reply test timeout
mtmk Sep 30, 2023
c22e29b
Fixed extraneous pull request
mtmk Sep 30, 2023
56af8ea
Avoid pending update races
mtmk Sep 30, 2023
92de056
Hunting consume heartbeat test failure
mtmk Sep 30, 2023
f61d977
Fixed format and warnings
mtmk Sep 30, 2023
d27d8e9
Format fixes
mtmk Sep 30, 2023
d4e239d
Fix initial pull request race
mtmk Sep 30, 2023
6159d9d
Merge branch 'main' into kv-initial-impl
mtmk Oct 2, 2023
f719f4c
wip
mtmk Oct 5, 2023
94983d9
Initial watch implementation
mtmk Oct 5, 2023
1234d55
Test fix
mtmk Oct 5, 2023
45e97f9
Merge branch 'main' into kv-initial-impl
mtmk Oct 5, 2023
6b9cf77
Merge branch 'main' into kv-initial-impl
mtmk Oct 6, 2023
3b51f54
wip
mtmk Oct 10, 2023
7431da1
Merge branch 'main' into kv-initial-impl
mtmk Oct 10, 2023
1a0111f
Use the new nuid for JetStream as well
mtmk Oct 10, 2023
fe70da1
Fix to mux inbox re-subscription
mtmk Oct 10, 2023
a652ea0
Revert test proxy threads to tasks
mtmk Oct 10, 2023
85bf538
Tidy up KV watcher
mtmk Oct 10, 2023
b3ace1a
Fixing potential test flapper
mtmk Oct 10, 2023
41f1d6f
More mux inbox reconnect test asserts
mtmk Oct 10, 2023
4f536be
Fix format and warnings
mtmk Oct 10, 2023
be17a67
Reverted unrelated changes
mtmk Oct 10, 2023
cae9de7
Test KV reconnect with history
mtmk Oct 11, 2023
c0be508
Code docs
mtmk Oct 11, 2023
b391e8d
Update sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Wa…
mtmk Oct 11, 2023
d2f3079
Stream get direct fix
mtmk Oct 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ jobs:
# This test is hanging sometimes. Find out where!
run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj

- name: Test Key/Value Store
run: dotnet test -c Debug --no-build tests/NATS.Client.KeyValueStore.Tests/NATS.Client.KeyValueStore.Tests.csproj

memory_test:
name: memory test
strategy:
Expand Down
21 changes: 21 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Perf", "tests\N
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.JetStream.PullConsumer", "sandbox\Example.JetStream.PullConsumer\Example.JetStream.PullConsumer.csproj", "{3A9FC281-3B81-4D63-A76B-E1127C1D2241}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.KeyValueStore", "src\NATS.Client.KeyValueStore\NATS.Client.KeyValueStore.csproj", "{A102AB7B-A90C-4717-B17C-045240838060}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.KeyValueStore.Tests", "tests\NATS.Client.KeyValueStore.Tests\NATS.Client.KeyValueStore.Tests.csproj", "{908F2CED-CAC0-4A4E-AD19-362A413B5DA4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.KeyValueStore.Watcher", "sandbox\Example.KeyValueStore.Watcher\Example.KeyValueStore.Watcher.csproj", "{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -165,6 +171,18 @@ Global
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Release|Any CPU.Build.0 = Release|Any CPU
{A102AB7B-A90C-4717-B17C-045240838060}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A102AB7B-A90C-4717-B17C-045240838060}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A102AB7B-A90C-4717-B17C-045240838060}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A102AB7B-A90C-4717-B17C-045240838060}.Release|Any CPU.Build.0 = Release|Any CPU
{908F2CED-CAC0-4A4E-AD19-362A413B5DA4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{908F2CED-CAC0-4A4E-AD19-362A413B5DA4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{908F2CED-CAC0-4A4E-AD19-362A413B5DA4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{908F2CED-CAC0-4A4E-AD19-362A413B5DA4}.Release|Any CPU.Build.0 = Release|Any CPU
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Debug|Any CPU.Build.0 = Debug|Any CPU
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Release|Any CPU.ActiveCfg = Release|Any CPU
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -194,6 +212,9 @@ Global
{B7DD4A9C-2D24-4772-951E-86A665C59ADF} = {BD234E2E-F51A-4B18-B8BE-8AF6D546BF87}
{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{3A9FC281-3B81-4D63-A76B-E1127C1D2241} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{A102AB7B-A90C-4717-B17C-045240838060} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{908F2CED-CAC0-4A4E-AD19-362A413B5DA4} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53} = {95A69671-16CA-4133-981C-CC381B7AAA30}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
4 changes: 3 additions & 1 deletion NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CR/@EntryIndexedValue">CR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CRLF/@EntryIndexedValue">CRLF</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JS/@EntryIndexedValue">JS</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=LF/@EntryIndexedValue">LF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Msgs/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Msgs/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Nuid/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
mtmk marked this conversation as resolved.
Show resolved Hide resolved
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.KeyValueStore\NATS.Client.KeyValueStore.csproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions sandbox/Example.KeyValueStore.Watcher/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

// var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
// var nats = new NatsConnection(options);
var nats = new NatsConnection();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync("e1");

await foreach (var entry in store.WatchAllAsync<int>())
{
Console.WriteLine($"[RCV] {entry}");
}
6 changes: 3 additions & 3 deletions sandbox/MicroBenchmark/NewInboxBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ public class NewInboxBenchmarks
{
private static readonly NatsOpts LongPrefixOpt = NatsOpts.Default
with
{
InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here",
};
{
InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here",
};

private static readonly NatsConnection ConnectionDefaultPrefix = new();
private static readonly NatsConnection ConnectionLongPrefix = new(LongPrefixOpt);
Expand Down
8 changes: 7 additions & 1 deletion src/NATS.Client.Core/INatsSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Buffers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;

Expand Down Expand Up @@ -104,6 +103,13 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
return (T)(object)new ReadOnlySequence<byte>(buffer.ToArray());
}

if (typeof(T) == typeof(IMemoryOwner<byte>))
{
var memoryOwner = MemoryPool<byte>.Shared.Rent((int)buffer.Length);
buffer.CopyTo(memoryOwner.Memory.Span);
return (T)memoryOwner;
}

if (Next != null)
return Next.Deserialize<T>(buffer);

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/NuidWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
Expand Down
17 changes: 10 additions & 7 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

internal InboxSubBuilder InboxSubBuilder { get; }

public async ValueTask SubscribeAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
public async ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationToken)
{
if (IsInboxSubject(subject))
if (IsInboxSubject(sub.Subject))
{
if (queueGroup != null)
if (sub.QueueGroup != null)
{
throw new NatsException("Inbox subscriptions don't support queue groups");
}

await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
await SubscribeInboxAsync(sub.Subject, sub.Opts, sub, cancellationToken).ConfigureAwait(false);
}
else
{
await SubscribeInternalAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false);
await SubscribeInternalAsync(sub.Subject, sub.QueueGroup, sub.Opts, sub, cancellationToken).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -181,7 +181,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
var inboxSubject = $"{_inboxPrefix}.*";
_inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this);
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
queueGroup: default,
Expand Down Expand Up @@ -269,5 +269,8 @@ private async ValueTask UnsubscribeSidsAsync(List<int> sids)
}
}

private bool IsInboxSubject(string subject) => subject.StartsWith(_inboxPrefix, StringComparison.Ordinal);
private bool IsInboxSubject(string subject)
{
return subject.StartsWith(_inboxPrefix, StringComparison.Ordinal);
}
}
2 changes: 2 additions & 0 deletions src/NATS.Client.Core/NATS.Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@
<InternalsVisibleTo Include="NatsBenchmark, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.JetStream, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.JetStream.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.KeyValueStore, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.KeyValueStore.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
</ItemGroup>
</Project>
8 changes: 4 additions & 4 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ internal ValueTask PubModelAsync<T>(string subject, T? data, INatsSerializer ser
}
}

internal ValueTask SubAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default)
internal ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default)
{
if (ConnectionState == NatsConnectionState.Open)
{
return SubscriptionManager.SubscribeAsync(subject, queueGroup, opts, sub, cancellationToken);
return SubscriptionManager.SubscribeAsync(sub, cancellationToken);
}
else
{
return WithConnectAsync(subject, queueGroup, opts, sub, cancellationToken, static (self, s, q, o, b, token) =>
return WithConnectAsync(sub, cancellationToken, static (self, s, token) =>
{
return self.SubscriptionManager.SubscribeAsync(s, q, o, b, token);
return self.SubscriptionManager.SubscribeAsync(s, token);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public partial class NatsConnection
}

[SkipLocalsInit]
private static string NewInbox(ReadOnlySpan<char> prefix)
internal static string NewInbox(ReadOnlySpan<char> prefix)
{
Span<char> buffer = stackalloc char[64];
var separatorLength = prefix.Length > 0 ? 1u : 0u;
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.RequestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal async ValueTask<INatsSub<TReply>> RequestSubAsync<TRequest, TReply>(

var replySerializer = replyOpts?.Serializer ?? Opts.Serializer;
var sub = new NatsSub<TReply>(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
await SubAsync(replyTo, queueGroup: default, replyOpts, sub, cancellationToken).ConfigureAwait(false);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);

var serializer = requestOpts?.Serializer ?? Opts.Serializer;

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public async ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, string? qu
{
var serializer = opts?.Serializer ?? Opts.Serializer;
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer);
await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
return sub;
}
}
3 changes: 3 additions & 0 deletions src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ internal NatsSubBase(
Connection = connection;
Subject = subject;
QueueGroup = queueGroup;
Opts = opts;

// Only allocate timers if necessary to reduce GC pressure
if (_idleTimeout != default)
Expand Down Expand Up @@ -105,6 +106,8 @@ internal NatsSubBase(

public NatsSubEndReason EndReason => (NatsSubEndReason)Volatile.Read(ref _endReasonRaw);

internal NatsSubOpts? Opts { get; private set; }

protected NatsConnection Connection { get; }

public virtual ValueTask ReadyAsync()
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
<ItemGroup>
<ProjectReference Include="..\NATS.Client.Core\NATS.Client.Core.csproj" />
<InternalsVisibleTo Include="$(AssemblyName).Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.KeyValueStore, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.KeyValueStore.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
</ItemGroup>

</Project>
14 changes: 2 additions & 12 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,7 @@ public async ValueTask<INatsJSConsume<T>> ConsumeAsync<T>(NatsJSConsumeOpts? opt
expires: timeouts.Expires,
idle: timeouts.IdleHeartbeat);

await _context.Connection.SubAsync(
subject: inbox,
queueGroup: default,
opts: requestOpts,
sub: sub,
cancellationToken);
await _context.Connection.SubAsync(sub: sub, cancellationToken);

// Start consuming with the first Pull Request
await sub.CallMsgNextAsync(
Expand Down Expand Up @@ -258,12 +253,7 @@ public async ValueTask<INatsJSFetch<T>> FetchAsync<T>(
expires: timeouts.Expires,
idle: timeouts.IdleHeartbeat);

await _context.Connection.SubAsync(
subject: inbox,
queueGroup: default,
opts: requestOpts,
sub: sub,
cancellationToken);
await _context.Connection.SubAsync(sub: sub, cancellationToken);

await sub.CallMsgNextAsync(
new ConsumerGetnextRequest
Expand Down
17 changes: 10 additions & 7 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,25 @@ public async ValueTask<NatsJSConsumer> CreateConsumerAsync(
ConsumerCreateRequest request,
CancellationToken cancellationToken = default)
{
if (!string.IsNullOrEmpty(request.Config.DeliverSubject))
// TODO: Adjust API subject according to server version and filter subject
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{request.StreamName}";

if (!string.IsNullOrWhiteSpace(request.Config.Name))
{
throw new NatsJSException("This API only support pull consumers. " +
"'deliver_subject' option applies to push consumers");
subject += $".{request.Config.Name}";
request.Config.Name = default!;
}

if (request.Config.AckPolicy == ConsumerConfigurationAckPolicy.none)
if (!string.IsNullOrWhiteSpace(request.Config.FilterSubject))
{
throw new NatsJSException("This API only support pull consumers. " +
"'ack_policy' must be set to 'explicit' or 'all' for pull consumers");
subject += $".{request.Config.FilterSubject}";
}

var response = await JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
subject: $"{Opts.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}",
subject: subject,
request,
cancellationToken);

return new NatsJSConsumer(this, response);
}

Expand Down
Loading