Skip to content

Commit

Permalink
Remove consumer create restrictions
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Sep 27, 2023
1 parent 9272cff commit 186fb63
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 40 deletions.
13 changes: 1 addition & 12 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,7 @@ public async ValueTask<NatsJSConsumer> CreateConsumerAsync(
ConsumerCreateRequest request,
CancellationToken cancellationToken = default)
{
if (!string.IsNullOrEmpty(request.Config.DeliverSubject))
{
throw new NatsJSException("This API only support pull consumers. " +
"'deliver_subject' option applies to push consumers");
}

if (request.Config.AckPolicy == ConsumerConfigurationAckPolicy.none)
{
throw new NatsJSException("This API only support pull consumers. " +
"'ack_policy' must be set to 'explicit' or 'all' for pull consumers");
}

// TODO: Adjust API subject according to server version and filter subject
var response = await JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
subject: $"{Opts.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}",
request,
Expand Down
28 changes: 0 additions & 28 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Buffers;
using System.Globalization;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
Expand Down Expand Up @@ -43,35 +42,17 @@ internal NatsKVStore(string bucket, NatsKVOpts opts, NatsJSContext context, Nats

public async ValueTask PutAsync<T>(string key, T value, CancellationToken cancellationToken = default)
{
// PUB $KV.profiles.sue.color
var ack = await _context.PublishAsync($"$KV.{_bucket}.{key}", value, cancellationToken: cancellationToken);
ack.EnsureSuccess();
}

public async ValueTask<NatsKVEntry<T?>> GetEntryAsync<T>(string key, INatsSerializer? serializer = default, CancellationToken cancellationToken = default)
{
// API--------------+ stream----+ subject--------------+
// $JS.API.DIRECT.GET.KV_profiles.$KV.profiles.sue.color

/*
< PUB $JS.API.DIRECT.GET.KV_profiles.$KV.profiles.sue.color _INBOX.QtAfMtjInNmkBUv64qBTHj 0
> HMSG _INBOX.QtAfMtjInNmkBUv64qBTHj 0 141 145
NATS/1.0
Nats-Stream: KV_profiles
Nats-Subject: $KV.profiles.sue.color
Nats-Sequence: 8
Nats-Time-Stamp: 2023-09-26T08:35:49.3726545Z
blue
*/
if (_stream.Info.Config.AllowDirect)
{
var direct = await _stream.GetDirectAsync<T>($"$KV.{_bucket}.{key}", serializer ?? _serializer, cancellationToken);
if (direct is { Headers: { } headers } msg)
{
var d = msg.Data;
var stringValues = headers["x"];

if (!headers.TryGetValue("Nats-Sequence", out var sequenceValues))
throw new NatsKVException("Missing sequence header");

Expand Down Expand Up @@ -119,15 +100,6 @@ public async ValueTask PutAsync<T>(string key, T value, CancellationToken cancel
}
else
{
/*[3] --> PUB $JS.API.STREAM.MSG.GET.KV_profiles _INBOX.thLyHrjut8ccR4XYaOdm3f 41
{"last_by_subj":"$KV.profiles.sue.color"}
[3] <-- MSG _INBOX.thLyHrjut8ccR4XYaOdm3f 0 170
{"type":"io.nats.jetstream.api.v1.stream_msg_get_response","message":{
"subject":"$KV.profiles.sue.color",
"seq":1,
"data":"Ymx1ZQ==",
"time":"2023-09-26T09:18:35.8845678Z"}}*/
var response = await _stream.GetAsync(new StreamMsgGetRequest { LastBySubj = $"$KV.{_bucket}.{key}" }, cancellationToken);

if (!DateTimeOffset.TryParse(response.Message.Time, out var created))
Expand Down

0 comments on commit 186fb63

Please sign in to comment.