From 186fb63b38c1a4537a0adfba48c561f75d8796f1 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 27 Sep 2023 12:25:27 +0100 Subject: [PATCH] Remove consumer create restrictions --- .../NatsJSContext.Consumers.cs | 13 +-------- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 28 ------------------- 2 files changed, 1 insertion(+), 40 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 389bb5287..3f34839b8 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -45,18 +45,7 @@ public async ValueTask CreateConsumerAsync( ConsumerCreateRequest request, CancellationToken cancellationToken = default) { - if (!string.IsNullOrEmpty(request.Config.DeliverSubject)) - { - throw new NatsJSException("This API only support pull consumers. " + - "'deliver_subject' option applies to push consumers"); - } - - if (request.Config.AckPolicy == ConsumerConfigurationAckPolicy.none) - { - throw new NatsJSException("This API only support pull consumers. " + - "'ack_policy' must be set to 'explicit' or 'all' for pull consumers"); - } - + // TODO: Adjust API subject according to server version and filter subject var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}", request, diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index c02a6503e..50045d9ab 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -1,5 +1,4 @@ using System.Buffers; -using System.Globalization; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; @@ -43,35 +42,17 @@ internal NatsKVStore(string bucket, NatsKVOpts opts, NatsJSContext context, Nats public async ValueTask PutAsync(string key, T value, CancellationToken cancellationToken = default) { - // PUB $KV.profiles.sue.color var ack = await _context.PublishAsync($"$KV.{_bucket}.{key}", value, cancellationToken: cancellationToken); ack.EnsureSuccess(); } public async ValueTask> GetEntryAsync(string key, INatsSerializer? serializer = default, CancellationToken cancellationToken = default) { - // API--------------+ stream----+ subject--------------+ - // $JS.API.DIRECT.GET.KV_profiles.$KV.profiles.sue.color - - /* - < PUB $JS.API.DIRECT.GET.KV_profiles.$KV.profiles.sue.color _INBOX.QtAfMtjInNmkBUv64qBTHj 0 - > HMSG _INBOX.QtAfMtjInNmkBUv64qBTHj 0 141 145 - NATS/1.0 - Nats-Stream: KV_profiles - Nats-Subject: $KV.profiles.sue.color - Nats-Sequence: 8 - Nats-Time-Stamp: 2023-09-26T08:35:49.3726545Z - - blue - */ if (_stream.Info.Config.AllowDirect) { var direct = await _stream.GetDirectAsync($"$KV.{_bucket}.{key}", serializer ?? _serializer, cancellationToken); if (direct is { Headers: { } headers } msg) { - var d = msg.Data; - var stringValues = headers["x"]; - if (!headers.TryGetValue("Nats-Sequence", out var sequenceValues)) throw new NatsKVException("Missing sequence header"); @@ -119,15 +100,6 @@ public async ValueTask PutAsync(string key, T value, CancellationToken cancel } else { -/*[3] --> PUB $JS.API.STREAM.MSG.GET.KV_profiles _INBOX.thLyHrjut8ccR4XYaOdm3f 41 - {"last_by_subj":"$KV.profiles.sue.color"} - -[3] <-- MSG _INBOX.thLyHrjut8ccR4XYaOdm3f 0 170 - {"type":"io.nats.jetstream.api.v1.stream_msg_get_response","message":{ - "subject":"$KV.profiles.sue.color", - "seq":1, - "data":"Ymx1ZQ==", - "time":"2023-09-26T09:18:35.8845678Z"}}*/ var response = await _stream.GetAsync(new StreamMsgGetRequest { LastBySubj = $"$KV.{_bucket}.{key}" }, cancellationToken); if (!DateTimeOffset.TryParse(response.Message.Time, out var created))