Skip to content

Commit

Permalink
Cherry-picks for 2.10.23-RC.2 (#6074)
Browse files Browse the repository at this point in the history
Includes the following:

* Some tweaks to the NRG test helpers
* #6055
* #6061
* #6065 
* #6041 (but with `math/rand`
instead of `math/rand/v2` due to an older Go version in CI for 2.10.x)
* #6066
* #6067
* #6069
* #6075
* #6082
* #6087
* #6086
* #6088
* #6089
* #6092
* #6096
* #6098
* #6097
* #6105
* #6104
* #6106
* #6109
* #6111
* #6112

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Nov 12, 2024
2 parents e0c5dc9 + d942fc6 commit c08f8cd
Show file tree
Hide file tree
Showing 29 changed files with 1,780 additions and 194 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.28.0
golang.org/x/sys v0.26.0
golang.org/x/time v0.7.0
golang.org/x/crypto v0.29.0
golang.org/x/sys v0.27.0
golang.org/x/time v0.8.0
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
144 changes: 74 additions & 70 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ type consumer struct {
outq *jsOutQ
pending map[uint64]*Pending
ptmr *time.Timer
ptmrEnd time.Time
rdq []uint64
rdqi avl.SequenceSet
rdc map[uint64]uint64
Expand Down Expand Up @@ -950,7 +951,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// If we have multiple filter subjects, create a sublist which we will use
// in calling store.LoadNextMsgMulti.
if len(o.cfg.FilterSubjects) > 0 {
o.filters = NewSublistWithCache()
o.filters = NewSublistNoCache()
for _, filter := range o.cfg.FilterSubjects {
o.filters.Insert(&subscription{subject: []byte(filter)})
}
Expand Down Expand Up @@ -1349,7 +1350,7 @@ func (o *consumer) setLeader(isLeader bool) {
stopAndClearTimer(&o.dtmr)

// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
Expand Down Expand Up @@ -1739,7 +1740,7 @@ func (o *consumer) forceExpirePending() {
p.Timestamp += off
}
}
o.ptmr.Reset(o.ackWait(0))
o.resetPtmr(o.ackWait(0))
}
o.signalNewMessages()
}
Expand Down Expand Up @@ -1882,7 +1883,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// AckWait
if cfg.AckWait != o.cfg.AckWait {
if o.ptmr != nil {
o.ptmr.Reset(100 * time.Millisecond)
o.resetPtmr(100 * time.Millisecond)
}
}
// Rate Limit
Expand Down Expand Up @@ -1940,7 +1941,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
if len(o.subjf) == 1 {
o.filters = nil
} else {
o.filters = NewSublistWithCache()
o.filters = NewSublistNoCache()
for _, filter := range o.subjf {
o.filters.Insert(&subscription{subject: []byte(filter.subject)})
}
Expand Down Expand Up @@ -2413,7 +2414,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
if o.ptmr != nil {
// Want checkPending to run and figure out the next timer ttl.
// TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire.
o.ptmr.Reset(10 * time.Millisecond)
o.resetPtmr(10 * time.Millisecond)
}
}
// Nothing else for use to do now so return.
Expand Down Expand Up @@ -2547,11 +2548,7 @@ func (o *consumer) applyState(state *ConsumerState) {
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
o.resetPtmr(delay)
}
}

Expand Down Expand Up @@ -2786,18 +2783,30 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
return false
}

// Check if this ack is above the current pointer to our next to deliver.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq >= o.sseq {
o.sseq = sseq + 1
}

mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return false
}

// Check if this ack is above the current pointer to our next to deliver.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq >= o.sseq {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
// than the last stream sequence.
var ss StreamState
mset.store.FastState(&ss)
if sseq > ss.LastSeq {
o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d",
o.acc.Name, o.stream, o.name, sseq, ss.LastSeq)
// FIXME(dlc) - For 2.11 onwards should we return an error here to the caller?
o.mu.Unlock()
return false
}
o.sseq = sseq + 1
}

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered (o.node != nil) this will be handled by
// processReplicatedAck after the ack has propagated.
Expand Down Expand Up @@ -3638,7 +3647,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
} else if subjf != nil { // Means single filtered subject since o.filters means > 1.
} else if len(subjf) > 0 { // Means single filtered subject since o.filters means > 1.
filter, wc := subjf[0].subject, subjf[0].hasWildcard
sm, sseq, err = store.LoadNextMsg(filter, wc, fseq, &pmsg.StoreMsg)
} else {
Expand Down Expand Up @@ -4283,37 +4292,15 @@ func (o *consumer) calculateNumPending() (npc, npf uint64) {
}

isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject
filters, subjf := o.filters, o.subjf

// Deliver Last Per Subject calculates num pending differently.
if isLastPerSubject {
// Consumer without filters.
if o.subjf == nil {
return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
}
// Consumer with filters.
for _, filter := range o.subjf {
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
}
return npc, npf
}
// Every other Delivery Policy is handled here.
// Consumer without filters.
if o.subjf == nil {
return o.mset.store.NumPending(o.sseq, _EMPTY_, false)
}
// Consumer with filters.
for _, filter := range o.subjf {
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, false)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
if filters != nil {
return o.mset.store.NumPendingMulti(o.sseq, filters, isLastPerSubject)
} else if len(subjf) > 0 {
filter := subjf[0].subject
return o.mset.store.NumPending(o.sseq, filter, isLastPerSubject)
}
return npc, npf
return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
Expand Down Expand Up @@ -4478,9 +4465,24 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
if o.pending == nil {
o.pending = make(map[uint64]*Pending)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)

// We could have a backoff that set a timer higher than what we need for this message.
// In that case, reset to lowest backoff required for a message redelivery.
minDelay := o.ackWait(0)
if l := len(o.cfg.BackOff); l > 0 {
bi := int(o.rdc[sseq])
if bi < 0 {
bi = 0
} else if bi >= l {
bi = l - 1
}
minDelay = o.ackWait(o.cfg.BackOff[bi])
}
minDeadline := time.Now().Add(minDelay)
if o.ptmr == nil || o.ptmrEnd.After(minDeadline) {
o.resetPtmr(minDelay)
}

if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
Expand Down Expand Up @@ -4603,24 +4605,21 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {

// Checks the pending messages.
func (o *consumer) checkPending() {
o.mu.RLock()
o.mu.Lock()
defer o.mu.Unlock()

mset := o.mset
// On stop, mset and timer will be nil.
if o.closed || mset == nil || o.ptmr == nil {
stopAndClearTimer(&o.ptmr)
o.mu.RUnlock()
o.stopAndClearPtmr()
return
}
o.mu.RUnlock()

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

o.mu.Lock()
defer o.mu.Unlock()

now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
next := int64(o.ackWait(0))
Expand All @@ -4636,11 +4635,7 @@ func (o *consumer) checkPending() {
check := len(o.pending) > 1024
for seq, p := range o.pending {
if check && atomic.LoadInt64(&o.awl) > 0 {
if o.ptmr == nil {
o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending)
} else {
o.ptmr.Reset(100 * time.Millisecond)
}
o.resetPtmr(100 * time.Millisecond)
return
}
// Check if these are no longer valid.
Expand Down Expand Up @@ -4707,15 +4702,10 @@ func (o *consumer) checkPending() {
}

if len(o.pending) > 0 {
delay := time.Duration(next)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(o.ackWait(delay))
}
o.resetPtmr(time.Duration(next))
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
Expand Down Expand Up @@ -4903,7 +4893,7 @@ func (o *consumer) selectStartingSeqNo() {
for _, filter := range o.subjf {
// Use first sequence since this is more optimized atm.
ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject)
if ss.First > o.sseq && ss.First < nseq {
if ss.First >= o.sseq && ss.First < nseq {
nseq = ss.First
}
}
Expand Down Expand Up @@ -5201,7 +5191,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.client = nil
sysc := o.sysc
o.sysc = nil
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
stopAndClearTimer(&o.dtmr)
stopAndClearTimer(&o.gwdtmr)
delivery := o.cfg.DeliverSubject
Expand Down Expand Up @@ -5623,3 +5613,17 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
return nil
}

func (o *consumer) resetPtmr(delay time.Duration) {
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
o.ptmrEnd = time.Now().Add(delay)
}

func (o *consumer) stopAndClearPtmr() {
stopAndClearTimer(&o.ptmr)
o.ptmrEnd = time.Time{}
}
Loading

0 comments on commit c08f8cd

Please sign in to comment.