Skip to content

Commit

Permalink
Cherry-pick PRs for v2.10.14-RC.7 (#5304)
Browse files Browse the repository at this point in the history
Cherry-picks the following into the v2.10.14 release branch:

* #5303
* #5305

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
wallyqs authored Apr 11, 2024
2 parents c9ba679 + 729706a commit fdbc9c3
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 7 deletions.
6 changes: 3 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3947,9 +3947,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
wr.hbt = time.Now().Add(wr.hb)
}
} else {
// We will redo this one.
o.sseq--
// We will redo this one as long as this is not a redelivery.
if dc == 1 {
o.sseq--
o.npc++
}
pmsg.returnToPool()
Expand Down Expand Up @@ -4391,7 +4391,7 @@ func (o *consumer) didNotDeliver(seq uint64, subj string) {
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
// to queue it up for immediate redelivery since
// we know it was not delivered.
// we know it was not delivered
if !o.onRedeliverQueue(seq) {
o.addToRedeliverQueue(seq)
o.signalNewMessages()
Expand Down
7 changes: 3 additions & 4 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5176,15 +5176,13 @@ func (fs *fileStore) syncBlocks() {
return
}
fs.setSyncTimer()
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
syncAlways := fs.fcfg.SyncAlways
if markDirty {
fs.dirty++
}
fs.mu.Unlock()

// Sync state file if we are not running with sync always.
if !syncAlways {
if !fs.fcfg.SyncAlways {
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
<-dios
fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms)
dios <- struct{}{}
Expand All @@ -5193,6 +5191,7 @@ func (fs *fileStore) syncBlocks() {
fd.Close()
}
}
fs.mu.Unlock()
}

// Select the message block where this message should be found.
Expand Down
88 changes: 88 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1078,6 +1079,93 @@ func TestJetStreamConsumerDelete(t *testing.T) {
}
}

func TestFetchWithDrain(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
AckWait: time.Second * 10,
})
require_NoError(t, err)

const messages = 10_000

for i := 0; i < messages; i++ {
sendStreamMsg(t, nc, "foo", fmt.Sprintf("%d", i+1))
}

cr := JSApiConsumerGetNextRequest{
Batch: 100_000,
Expires: time.Second * 10,
}
crBytes, err := json.Marshal(cr)
require_NoError(t, err)

msgs := make(map[int]int)

processMsg := func(t *testing.T, sub *nats.Subscription, msgs map[int]int) bool {
msg, err := sub.NextMsg(time.Second * 1)
if err != nil {
return false
}
metadata, err := msg.Metadata()
require_NoError(t, err)
err = msg.Ack()
require_NoError(t, err)

v, err := strconv.Atoi(string(msg.Data))
require_NoError(t, err)
require_Equal(t, uint64(v), metadata.Sequence.Stream)

_, ok := msgs[int(metadata.Sequence.Stream)]
if _, ok := msgs[int(metadata.Sequence.Stream-1)]; !ok && len(msgs) > 0 {
t.Logf("Stream Sequence gap detected: current %d", metadata.Sequence.Stream)
}
if ok {
t.Fatalf("Message has been seen before")
}

msgs[int(metadata.Sequence.Stream)] = int(metadata.NumDelivered)

require_NoError(t, err)
return true
}

for {
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
require_NoError(t, err)

err = nc.PublishRequest(fmt.Sprintf(JSApiRequestNextT, "TEST", "C"), inbox, crBytes)
require_NoError(t, err)

// Drain after first message processed.
processMsg(t, sub, msgs)
sub.Drain()

for {
if !processMsg(t, sub, msgs) {
if len(msgs) == messages {
return
}
break
}
}
}
}

func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) {
subject := "foo.bar.do.not.match.any.filter.subject"
for n := 1; n <= 1024; n *= 2 {
Expand Down

0 comments on commit fdbc9c3

Please sign in to comment.