From b7ad8cf8cad8db2760a8c1c9fc12ab282698202c Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 27 Aug 2024 13:41:04 +0200 Subject: [PATCH] [FIXED] Invalid fetch sequence in ordered consumer Fetch and Next after timeout Signed-off-by: Piotr Piotrowski --- jetstream/ordered.go | 8 +++- jetstream/test/ordered_test.go | 70 ++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/jetstream/ordered.go b/jetstream/ordered.go index 199b8c383..3f17ddf81 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -402,7 +402,9 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro c.currentConsumer.Unlock() return nil, ErrOrderedConsumerConcurrentRequests } - c.cursor.streamSeq = c.runningFetch.sseq + if c.runningFetch.sseq != 0 { + c.cursor.streamSeq = c.runningFetch.sseq + } } c.currentConsumer.Unlock() c.consumerType = consumerTypeFetch @@ -438,7 +440,9 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat if !c.runningFetch.done { return nil, ErrOrderedConsumerConcurrentRequests } - c.cursor.streamSeq = c.runningFetch.sseq + if c.runningFetch.sseq != 0 { + c.cursor.streamSeq = c.runningFetch.sseq + } } c.consumerType = consumerTypeFetch sub := orderedSubscription{ diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index 6680955be..1d063166c 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -1571,21 +1571,19 @@ func TestOrderedConsumerNext(t *testing.T) { } publishTestMsgs(t, js) - msg, err := c.Next() + _, err = c.Next() if err != nil { t.Fatalf("Unexpected error: %s", err) } - msg.Ack() name := c.CachedInfo().Name if err := s.DeleteConsumer(ctx, name); err != nil { t.Fatal(err) } - msg, err = c.Next() + _, err = c.Next() if err != nil { t.Fatalf("Unexpected error: %s", err) } - msg.Ack() }) t.Run("consumer used as consume", func(t *testing.T) { @@ -1623,6 +1621,70 @@ func TestOrderedConsumerNext(t *testing.T) { t.Fatalf("Expected error: %s; got: %s", jetstream.ErrOrderConsumerUsedAsConsume, err) } }) + + t.Run("preserve sequence after fetch error", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.Publish(ctx, "FOO.A", []byte("msg")); err != nil { + t.Fatalf("Unexpected error during publish: %s", err) + } + msg, err := c.Next() + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + meta, err := msg.Metadata() + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if meta.Sequence.Stream != 1 { + t.Fatalf("Expected sequence: %d; got: %d", 1, meta.Sequence.Stream) + } + + // get next message, it should time out (no more messages on stream) + msg, err = c.Next(jetstream.FetchMaxWait(100 * time.Millisecond)) + if !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("Expected error: %s; got: %s", nats.ErrTimeout, err) + } + + if _, err := js.Publish(ctx, "FOO.A", []byte("msg")); err != nil { + t.Fatalf("Unexpected error during publish: %s", err) + } + + // get next message, it should have stream sequence 2 + msg, err = c.Next() + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + meta, err = msg.Metadata() + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if meta.Sequence.Stream != 2 { + t.Fatalf("Expected sequence: %d; got: %d", 2, meta.Sequence.Stream) + } + }) } func TestOrderedConsumerFetchNoWait(t *testing.T) {