From ee6ec9370bb8f515c647c70b4df4bccca573c9c0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 11 Apr 2024 07:44:41 -0700 Subject: [PATCH 1/2] Also hold fs lock while sync call to index.db Signed-off-by: Derek Collison --- server/filestore.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index a887f47df97..8c296ae016f 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5176,15 +5176,13 @@ func (fs *fileStore) syncBlocks() { return } fs.setSyncTimer() - fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) - syncAlways := fs.fcfg.SyncAlways if markDirty { fs.dirty++ } - fs.mu.Unlock() // Sync state file if we are not running with sync always. - if !syncAlways { + if !fs.fcfg.SyncAlways { + fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) <-dios fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms) dios <- struct{}{} @@ -5193,6 +5191,7 @@ func (fs *fileStore) syncBlocks() { fd.Close() } } + fs.mu.Unlock() } // Select the message block where this message should be found. From 729706a24d599ad755ebd51e07e08c479779f695 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 11 Apr 2024 12:24:11 -0700 Subject: [PATCH 2/2] [FIXED] Multiple deliveries of messages with delivery count going backwards. (#5305) When we fail to deliver a message, we were not checking if this was a redelivery already and would decrement o.sseq, meaning we would pick up the same message after the next redelivery and would have a delivered count of 1. This could lead to a message being delivered from the redelivery queue, but that could fail, then you would see same message twice, first with dc of 2, then 1. Now app only gets one copy with delivery count of 2. Signed-off-by: Derek Collison --------- Signed-off-by: Tomasz Pietrek Signed-off-by: Derek Collison Co-authored-by: Tomasz Pietrek --- server/consumer.go | 6 +-- server/jetstream_consumer_test.go | 88 +++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2b8633f1210..adc81d4b303 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3947,9 +3947,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { wr.hbt = time.Now().Add(wr.hb) } } else { - // We will redo this one. - o.sseq-- + // We will redo this one as long as this is not a redelivery. if dc == 1 { + o.sseq-- o.npc++ } pmsg.returnToPool() @@ -4391,7 +4391,7 @@ func (o *consumer) didNotDeliver(seq uint64, subj string) { if _, ok := o.pending[seq]; ok { // We found this messsage on pending, we need // to queue it up for immediate redelivery since - // we know it was not delivered. + // we know it was not delivered if !o.onRedeliverQueue(seq) { o.addToRedeliverQueue(seq) o.signalNewMessages() diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 2bc4108246e..a7d022007f7 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -1078,6 +1079,93 @@ func TestJetStreamConsumerDelete(t *testing.T) { } } +func TestFetchWithDrain(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.LimitsPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C", + AckPolicy: nats.AckExplicitPolicy, + AckWait: time.Second * 10, + }) + require_NoError(t, err) + + const messages = 10_000 + + for i := 0; i < messages; i++ { + sendStreamMsg(t, nc, "foo", fmt.Sprintf("%d", i+1)) + } + + cr := JSApiConsumerGetNextRequest{ + Batch: 100_000, + Expires: time.Second * 10, + } + crBytes, err := json.Marshal(cr) + require_NoError(t, err) + + msgs := make(map[int]int) + + processMsg := func(t *testing.T, sub *nats.Subscription, msgs map[int]int) bool { + msg, err := sub.NextMsg(time.Second * 1) + if err != nil { + return false + } + metadata, err := msg.Metadata() + require_NoError(t, err) + err = msg.Ack() + require_NoError(t, err) + + v, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + require_Equal(t, uint64(v), metadata.Sequence.Stream) + + _, ok := msgs[int(metadata.Sequence.Stream)] + if _, ok := msgs[int(metadata.Sequence.Stream-1)]; !ok && len(msgs) > 0 { + t.Logf("Stream Sequence gap detected: current %d", metadata.Sequence.Stream) + } + if ok { + t.Fatalf("Message has been seen before") + } + + msgs[int(metadata.Sequence.Stream)] = int(metadata.NumDelivered) + + require_NoError(t, err) + return true + } + + for { + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + require_NoError(t, err) + + err = nc.PublishRequest(fmt.Sprintf(JSApiRequestNextT, "TEST", "C"), inbox, crBytes) + require_NoError(t, err) + + // Drain after first message processed. + processMsg(t, sub, msgs) + sub.Drain() + + for { + if !processMsg(t, sub, msgs) { + if len(msgs) == messages { + return + } + break + } + } + } +} + func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) { subject := "foo.bar.do.not.match.any.filter.subject" for n := 1; n <= 1024; n *= 2 {