Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.22-RC.1 #5979

Merged
merged 10 commits into from
Oct 9, 2024
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ module github.com/nats-io/nats-server/v2
go 1.21.0

require (
github.com/klauspost/compress v1.17.9
github.com/klauspost/compress v1.17.10
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.27.0
golang.org/x/sys v0.25.0
golang.org/x/time v0.6.0
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.28.0
golang.org/x/sys v0.26.0
golang.org/x/time v0.7.0
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
Expand All @@ -18,14 +18,14 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Server Options:
-c, --config <file> Configuration file
-t Test configuration and exit
-sl,--signal <signal>[=<pid>] Send signal to nats-server process (ldm, stop, quit, term, reopen, reload)
pid> can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid)
<pid> can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid)
--client_advertise <string> Client URL to advertise to other servers
--ports_file_dir <dir> Creates a ports file in the specified directory (<executable_name>_<pid>.ports).

Expand Down
71 changes: 39 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ type consumer struct {
dseq uint64 // delivered consumer sequence
adflr uint64 // ack delivery floor
asflr uint64 // ack store floor
chkflr uint64 // our check floor, interest streams only.
npc int64 // Num Pending Count
npf uint64 // Num Pending Floor Sequence
dsubj string
Expand Down Expand Up @@ -2918,28 +2919,6 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5512,16 +5491,24 @@ func (o *consumer) isMonitorRunning() bool {
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() error {
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState || o.store == nil {
if o.closed || !shouldProcessState || o.store == nil || ss == nil {
o.mu.RUnlock()
return nil
}
store := mset.store
state, err := o.store.State()

filters, subjf, filter := o.filters, o.subjf, _EMPTY_
var wc bool
if filters == nil && subjf != nil {
filter, wc = subjf[0].subject, subjf[0].hasWildcard
}
chkfloor := o.chkflr
o.mu.RUnlock()

if err != nil {
Expand All @@ -5534,26 +5521,46 @@ func (o *consumer) checkStateForInterestStream() error {
return nil
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
if o.matchAck(seq) {
var smv StoreMsg
var seq, nseq uint64
// Start at first stream seq or a previous check floor, whichever is higher.
// Note this will really help for interest retention, with WQ the loadNextMsg
// gets us a long way already since it will skip deleted msgs not for our filter.
fseq := ss.FirstSeq
if chkfloor > fseq {
fseq = chkfloor
}

for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
} else {
_, nseq, err = store.LoadNextMsg(filter, wc, seq, &smv)
}
// if we advanced sequence update our seq. This can be on no error and EOF.
if nseq > seq {
seq = nseq
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
o.mu.RUnlock()
o.mu.Unlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 {
Expand Down
Loading