From c984e57a7061c185ab0cc2cb52601abd88258e25 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 6 Nov 2024 18:23:49 +0100 Subject: [PATCH] Fix consumer with start sequence and multiple filters Signed-off-by: Tomasz Pietrek --- server/consumer.go | 2 +- server/jetstream_consumer_test.go | 50 +++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index 13b8e97edb4..dcfb70ba197 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5331,7 +5331,7 @@ func (o *consumer) selectStartingSeqNo() { for _, filter := range o.subjf { // Use first sequence since this is more optimized atm. ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject) - if ss.First > o.sseq && ss.First < nseq { + if ss.First >= o.sseq && ss.First < nseq { nseq = ss.First } } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 7d5b2f69863..3e3bbdbb4d7 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -2236,6 +2236,56 @@ func TestJetStreamConsumerOverflow(t *testing.T) { msg, err = maxAckPending50.NextMsg(time.Second) require_NoError(t, err) require_NotNil(t, msg) +} + +func TestJetStreamConsumerMultipleFitersWithStartDate(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + past := time.Now().Add(-90 * time.Second) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "events.foo", "msg-1") + sendStreamMsg(t, nc, "events.bar", "msg-2") + sendStreamMsg(t, nc, "events.baz", "msg-3") + sendStreamMsg(t, nc, "events.biz", "msg-4") + sendStreamMsg(t, nc, "events.faz", "msg-5") + sendStreamMsg(t, nc, "events.foo", "msg-6") + sendStreamMsg(t, nc, "events.biz", "msg-7") + + for _, test := range []struct { + name string + filterSubjects []string + startTime time.Time + expectedMessages uint64 + expectedStreamSequence uint64 + }{ + {"Single-Filter-first-sequence", []string{"events.foo"}, past, 2, 0}, + {"Multiple-Filter-first-sequence", []string{"events.foo", "events.bar", "events.baz"}, past, 4, 0}, + {"Multiple-Filters-second-subject", []string{"events.bar", "events.baz"}, past, 2, 1}, + {"Multiple-Filters-first-last-subject", []string{"events.foo", "events.biz"}, past, 4, 0}, + {"Multiple-Filters-in-future", []string{"events.foo", "events.biz"}, time.Now().Add(1 * time.Minute), 0, 7}, + } { + t.Run(test.name, func(t *testing.T) { + info, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: test.name, + FilterSubjects: test.filterSubjects, + DeliverPolicy: nats.DeliverByStartTimePolicy, + OptStartTime: &test.startTime, + }) + require_NoError(t, err) + require_Equal(t, test.expectedStreamSequence, info.Delivered.Stream) + require_Equal(t, test.expectedMessages, info.NumPending) + }) + } }